在使用Flink Presto 1.14.0库时遇到了Async Task Checkpoint失败的错误。

huangapple go评论54阅读模式
英文:

Getting Async Task Checkpoint failed error with Flink Presto 1.14.0 library

问题

以下是要翻译的内容:

"40% of time my checkpoints are getting failed and no exceptions are visible on Flink dashboard. I am seeing following S3 read timeout exception in job manager logs. Everything seems to be working with hadoop-s3-fs library. This issue is only coming with Presto library. I someway believes that it is linked to amount of traffic processed by my Flink job because this exception percentage got reduced once I lower down the traffic in my job."

"Task manager logs:

2023-06-02 05:44:34,258 WARN FsCheckpointStreamFactory - Could not delete the checkpoint stream file s3p://prod-flink-test-checkpoint/checkpoints/755723110b30dcf1c3d21d27a59ec994/chk-5/1abecc87-aaed-4e5a-90ce-e1b412c9a1a7.
java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:164) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:658) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:642) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:502) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:479) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:109) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:363) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) [flink-dist_2.12-1.14.0.jar:1.14.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372]
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-dist_2.12-1.14.0.jar:1.14.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
Suppressed: com.amazonaws.AbortedException:
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:868) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(Am

英文:

I am trying to use Flink S3 checkpointing with Flink s3 presto library. Following is my s3 related flink configs :

    fs.allowed-fallback-filesystems: s3p
    state.backend: filesystem
    state.checkpoints.dir: s3p://gamma-flink-test-checkpoint/checkpoints/
    state.backend.fs.checkpointdir: s3p://gamma-flink-test-checkpoint/checkpoints/
    state.savepoints.dir: s3p://gamma-flink-test-checkpoint/savepoints/
    state.backend.fs.savepointdir: s3p://gamma-flink-test-checkpoint/savepoints/
s3.connection.timeout: 600000
s3.socket.timeout: 600000
s3.connection.establish.timeout: 600000

40% of time my checkpoints are getting failed and no exceptions are visible on Flink dashboard. I am seeing following S3 read timeout exception in job manager logs. Everything seems to be working with hadoop-s3-fs library. This issue is only coming with Presto library. I someway believes that it is linked to amount of traffic processed by my Flink job because this exception percentage got reduced once I lower down the traffic in my job.

Caused by: org.apache.flink.util.SerializedThrowable: com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed out
        at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1278) ~[?:?]
        at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:1226) ~[?:?]
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[hadoop-common-3.2.1-amzn-5.jar:?]
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[hadoop-common-3.2.1-amzn-5.jar:?]
        at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?]
        at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:354) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
        at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.&lt;init&gt;(OperatorSnapshotFinalizer.java:54) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: Unable to execute HTTP request: Read timed out
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:415) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6320) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1840) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1800) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:168) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:148) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:115) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:45) ~[aws-java-sdk-bundle-1.12.31.jar:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
        ... 3 more

Task manager logs:

2023-06-02 05:44:34,258 WARN                 FsCheckpointStreamFactory -  - Could not delete the checkpoint stream file s3p://prod-flink-test-checkpoint/checkpoints/755723110b30dcf1c3d21d27a59ec994/chk-5/1abecc87-aaed-4e5a-90ce-e1b412c9a1a7.
java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
        at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:164) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
        at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:658) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
        at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:642) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
        at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
        at com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:502) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
        at com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:479) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
        at org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
        at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:109) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:363) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) [flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) [flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) [flink-dist_2.12-1.14.0.jar:1.14.0]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372]
        at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642) [flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.&lt;init&gt;(OperatorSnapshotFinalizer.java:54) [flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) [flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-dist_2.12-1.14.0.jar:1.14.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
        Suppressed: com.amazonaws.AbortedException:
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:868) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:746) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
                at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
                at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
                at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
                at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
@@@   

在使用Flink Presto 1.14.0库时遇到了Async Task Checkpoint失败的错误。

答案1

得分: 0

S3套接字超时设置为10分钟,应该足够了,知道默认超时大约为5秒,如果我没记错的话。
这就是为什么我怀疑配置中的值是否实际应用了。

您可以尝试使用以下配置:presto.s3.socket-timeout: 600000 ?

英文:

The S3 socket timeout you set is 10 minutes which should be more than enough, knowing that the default timeout is around 5 seconds IIRC.
That's why I have doubts whether the value in the config is actually applied or not.

Can you try using the following config: presto.s3.socket-timeout: 600000 ?

答案2

得分: 0

根据 https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/,Flink Presto 实现使用与 Presto 文件系统相同的配置键。这些在 https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration 上有文档记录。配置键在 Presto 和 Hadoop 之间有所不同。在 Hadoop 中,没有:

  • s3.connection.timeout,但有 s3.connect-timeout
  • s3.socket.timeout,但有 s3.socket-timeout
英文:

Per https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/, the Flink Presto implementation uses the same configuration keys as the Presto file system. These are documented on https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration. The configuration keys differ between Presto and Hadoop. In Hadoop, there is no:

  • s3.connection.timeout but there is s3.connect-timeout
  • s3.socket.timeout but there is s3.socket-timeout

huangapple
  • 本文由 发表于 2023年6月2日 10:37:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76386805.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定