英文:
Getting Async Task Checkpoint failed error with Flink Presto 1.14.0 library
问题
以下是要翻译的内容:
"40% of time my checkpoints are getting failed and no exceptions are visible on Flink dashboard. I am seeing following S3 read timeout exception in job manager logs. Everything seems to be working with hadoop-s3-fs library. This issue is only coming with Presto library. I someway believes that it is linked to amount of traffic processed by my Flink job because this exception percentage got reduced once I lower down the traffic in my job."
"Task manager logs:
2023-06-02 05:44:34,258 WARN FsCheckpointStreamFactory - Could not delete the checkpoint stream file s3p://prod-flink-test-checkpoint/checkpoints/755723110b30dcf1c3d21d27a59ec994/chk-5/1abecc87-aaed-4e5a-90ce-e1b412c9a1a7.
java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:164) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:658) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:642) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:502) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:479) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:109) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:363) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) [flink-dist_2.12-1.14.0.jar:1.14.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372]
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-dist_2.12-1.14.0.jar:1.14.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
Suppressed: com.amazonaws.AbortedException:
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:868) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(Am
英文:
I am trying to use Flink S3 checkpointing with Flink s3 presto library. Following is my s3 related flink configs :
fs.allowed-fallback-filesystems: s3p
state.backend: filesystem
state.checkpoints.dir: s3p://gamma-flink-test-checkpoint/checkpoints/
state.backend.fs.checkpointdir: s3p://gamma-flink-test-checkpoint/checkpoints/
state.savepoints.dir: s3p://gamma-flink-test-checkpoint/savepoints/
state.backend.fs.savepointdir: s3p://gamma-flink-test-checkpoint/savepoints/
s3.connection.timeout: 600000
s3.socket.timeout: 600000
s3.connection.establish.timeout: 600000
40% of time my checkpoints are getting failed and no exceptions are visible on Flink dashboard. I am seeing following S3 read timeout exception in job manager logs. Everything seems to be working with hadoop-s3-fs library. This issue is only coming with Presto library. I someway believes that it is linked to amount of traffic processed by my Flink job because this exception percentage got reduced once I lower down the traffic in my job.
Caused by: org.apache.flink.util.SerializedThrowable: com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed out
at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1278) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:1226) ~[?:?]
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[hadoop-common-3.2.1-amzn-5.jar:?]
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[hadoop-common-3.2.1-amzn-5.jar:?]
at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?]
at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:354) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: Unable to execute HTTP request: Read timed out
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:415) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6320) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1840) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1800) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:168) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:148) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:115) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:45) ~[aws-java-sdk-bundle-1.12.31.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
... 3 more
Task manager logs:
2023-06-02 05:44:34,258 WARN FsCheckpointStreamFactory - - Could not delete the checkpoint stream file s3p://prod-flink-test-checkpoint/checkpoints/755723110b30dcf1c3d21d27a59ec994/chk-5/1abecc87-aaed-4e5a-90ce-e1b412c9a1a7.
java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:164) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:658) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:642) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:502) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:479) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:109) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:363) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:177) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) [flink-dist_2.12-1.14.0.jar:1.14.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372]
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-dist_2.12-1.14.0.jar:1.14.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
Suppressed: com.amazonaws.AbortedException:
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:868) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:746) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259) ~[blob_p-d7a21b26c1db94b9d191a3f56ca4a2799b682e9f-d80b4f3b988836c5eee9e171e4e35060:?]
@@@
答案1
得分: 0
S3套接字超时设置为10分钟,应该足够了,知道默认超时大约为5秒,如果我没记错的话。
这就是为什么我怀疑配置中的值是否实际应用了。
您可以尝试使用以下配置:presto.s3.socket-timeout: 600000 ?
英文:
The S3 socket timeout you set is 10 minutes which should be more than enough, knowing that the default timeout is around 5 seconds IIRC.
That's why I have doubts whether the value in the config is actually applied or not.
Can you try using the following config: presto.s3.socket-timeout: 600000 ?
答案2
得分: 0
根据 https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/,Flink Presto 实现使用与 Presto 文件系统相同的配置键。这些在 https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration 上有文档记录。配置键在 Presto 和 Hadoop 之间有所不同。在 Hadoop 中,没有:
s3.connection.timeout
,但有s3.connect-timeout
s3.socket.timeout
,但有s3.socket-timeout
英文:
Per https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/, the Flink Presto implementation uses the same configuration keys as the Presto file system. These are documented on https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration. The configuration keys differ between Presto and Hadoop. In Hadoop, there is no:
s3.connection.timeout
but there iss3.connect-timeout
s3.socket.timeout
but there iss3.socket-timeout
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论