配置未指定检查点目录 ‘state.checkpoints.dir’

huangapple go评论49阅读模式
英文:

The configuration does not specify the checkpoint directory 'state.checkpoints.dir'

问题

在 Dataproc 集群上提交 Flink 作业时出现以下错误。请找出代码基础和错误。我正在使用 Flink 1.9.3 版本。

程序以以下异常结束

org.apache.flink.client.program.ProgramInvocationException: 无法检索执行结果(JobID: f064ceaa5b318fdad9a77b2723b9ee64)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
        ...
        ...
        ...省略中间部分
        ...
        ...
        ... 4 more

我执行的代码片段:

public class ReadFromPubsub
{

	public static void main(String args[]) throws Exception 
	{
		System.out.println("Flink Pubsub Code Read 1");
		
		
        StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
       
        System.out.println("Flink Pubsub Code Read 2");
        DeserializationSchema<String> deserializer = new SimpleStringSchema();
        System.out.println("Flink Pubsub Code Read 3");
        SourceFunction<String> pubsubSource = PubSubSource.newBuilder()
                                                            .withDeserializationSchema(deserializer)
                                                            .withProjectName("vz-it-np-gudv-dev-vzntdo-0")
                                                            .withSubscriptionName("subscription1")
                                                            .build();
        System.out.println("Flink Pubsub Code Read 4");
        streamExecEnv.addSource(pubsubSource).print();
        streamExecEnv.enableCheckpointing(10);
        System.out.println("Flink Pubsub Code Read 5");
        streamExecEnv.execute();
    }
}

我可以看到代码执行过程中的所有打印语句。在最后一个打印语句后,我遇到了错误。

需要解决所有的异常。

英文:

While submitting the flink job on the dataproc cluster getting the below error. Please find the code base and the error. I am using the flink 1.9.3 version.

 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: f064ceaa5b318fdad9a77b2723b9ee64)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at org.flink.ReadFromPubsub.main(ReadFromPubsub.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., &lt;Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.&lt;init&gt;(JobManagerRunner.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:376)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:303)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
at org.apache.flink.runtime.scheduler.LegacyScheduler.&lt;init&gt;(LegacyScheduler.java:176)
at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.&lt;init&gt;(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.&lt;init&gt;(JobManagerRunner.java:146)
... 10 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory &#39;state.checkpoints.dir&#39;
at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:44)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:154)
at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:219)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:299)
... 20 more
End of exception on server side&gt;]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more

Code snippet I executed

public class ReadFromPubsub
{
public static void main(String args[]) throws Exception 
{
System.out.println(&quot;Flink Pubsub Code Read 1&quot;);
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println(&quot;Flink Pubsub Code Read 2&quot;);
DeserializationSchema&lt;String&gt; deserializer = new SimpleStringSchema();
System.out.println(&quot;Flink Pubsub Code Read 3&quot;);
SourceFunction&lt;String&gt; pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializer)
.withProjectName(&quot;vz-it-np-gudv-dev-vzntdo-0&quot;)
.withSubscriptionName(&quot;subscription1&quot;)
.build();
System.out.println(&quot;Flink Pubsub Code Read 4&quot;);
streamExecEnv.addSource(pubsubSource).print();
streamExecEnv.enableCheckpointing(10);
System.out.println(&quot;Flink Pubsub Code Read 5&quot;);
streamExecEnv.execute();
}
}

I can see all the print statements during the execution of the code. After the last print statement I am getting the error.

All the exceptions to be resolved

答案1

得分: 0

一般来说,在 Flink 配置文件 flink-conf.yaml 中提供适当的保存点(savepoint)/检查点(checkpoint)目录,具体细节可在文档中找到。如果您尚未进行设置,可以通过以下方式进行设置:

state.checkpoints.dir: "file://example/checkpoints"

通过编程方式(在作业内部):

Configuration configuration = new Configuration();
conf.setString("state.checkpoints.dir", "file://example/checkpoints");

StreamExecutionEnvironment streamExecEnv = 
    StreamExecutionEnvironment.getExecutionEnvironment(configuration);

通过 CLI:

flink run -Dstate.checkpoints.dir="/example/checkpoints" your-job.jar

此外,如果您实际上并不希望执行检查点操作,您可以在作业中删除以下配置:

streamExecEnv.enableCheckpointing(10);
英文:

Generally, you would supply the appropriate savepoint/checkpointing directory within your Flink configuration within flink-conf.yaml as detailed in the docs. If you aren't currently setting it, you can do so in the following ways:

state.checkpoints.dir: &quot;file://example/checkpoints&quot;

Programmatically (within the job):

Configuration configuration = new Configuration();
conf.setString(&quot;state.checkpoints.dir&quot;, &quot;file://example/checkpoints&quot;);
StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

Via the CLI:

flink run -Dstate.checkpoints.dir=&quot;/example/checkpoints&quot; your-job.jar

Additionally, if you didn't actually want to perform checkpointing, you could likely remove the following configuration within your job:

streamExecEnv.enableCheckpointing(10);

huangapple
  • 本文由 发表于 2023年2月14日 18:43:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/75446697.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定