java.io.FileNotFoundException error in Apache Spark even though my file exists

huangapple go评论75阅读模式
英文:

java.io.FileNotFoundException error in Apache Spark even though my file exists

问题

这是关于Spark中文件不存在问题的一段反馈和相关代码。您想要知道出了什么问题。以下是关于文件不存在问题的信息:

  • 问题描述:您在尝试下载文件并读取它时遇到了文件不存在的问题。
  • 错误信息:java.io.FileNotFoundException: File file:/app/data-Feb-19-2023_131049.json does not exist

然后,您提供了下载文件和读取文件的代码,以及关于SparkApplication的配置。

问题可能出在文件路径或访问权限上。要进一步诊断并找到问题,您可以考虑以下几点:

  1. 检查文件路径:确保您指定的文件路径是正确的。您已经在代码中使用了find_files函数来查找文件,但需要确保路径是正确的,并且文件确实存在于该路径。

  2. 访问权限:确保Spark应用程序有足够的权限来访问文件。在Kubernetes集群中运行的应用程序可能需要适当的权限配置,以便能够读取文件。

  3. 日志和调试:查看应用程序的日志以获取更多详细信息,看看是否有其他错误消息或异常,这有助于更好地理解问题的根本原因。

  4. 文件存储位置:如果您将文件存储在Minio中,确保Minio服务正在正确配置,并且应用程序可以与其通信。

通过仔细检查这些方面,您应该能够找到问题的根本原因并解决它。如果问题仍然存在,请提供更多详细信息,以便更好地帮助您解决问题。

英文:

I'm new to spark and doing on POC to download a file and then read it. However, I am facing issue that the file doesn't exists.

> java.io.FileNotFoundException: File file:/app/data-Feb-19-2023_131049.json does not exist

But when I printed the path of the file I find out the file exists and the path is also correct.

This is the output

  1. 23/02/19 13:10:46 INFO BlockManagerMasterEndpoint: Registering block manager 10.14.142.21:37515 with 2.2 GiB RAM, BlockManagerId(1, 10.14.142.21, 37515, None)
  2. FILE IS DOWNLOADED
  3. ['/app/data-Feb-19-2023_131049.json']
  4. 23/02/19 13:10:49 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
  5. 23/02/19 13:10:49 INFO SharedState: Warehouse path is 'file:/app/spark-warehouse'.
  6. 23/02/19 13:10:50 INFO InMemoryFileIndex: It took 39 ms to list leaf files for 1 paths.
  7. 23/02/19 13:10:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 206.6 KiB, free 1048.6 MiB)
  8. 23/02/19 13:10:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 35.8 KiB, free 1048.6 MiB)
  9. 23/02/19 13:10:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on experian-el-d41b428669cc1e8e-driver-svc.environments-quin-dev-1.svc:7079 (size: 35.8 KiB, free: 1048.8 MiB)
  10. 23/02/19 13:10:51 INFO SparkContext: Created broadcast 0 from json at <unknown>:0
  11. 23/02/19 13:10:51 INFO FileInputFormat: Total input files to process : 1
  12. 23/02/19 13:10:51 INFO FileInputFormat: Total input files to process : 1
  13. 23/02/19 13:10:51 INFO SparkContext: Starting job: json at <unknown>:0
  14. 23/02/19 13:10:51 INFO DAGScheduler: Got job 0 (json at <unknown>:0) with 1 output partitions
  15. 23/02/19 13:10:51 INFO DAGScheduler: Final stage: ResultStage 0 (json at <unknown>:0)
  16. 23/02/19 13:10:51 INFO DAGScheduler: Parents of final stage: List()
  17. 23/02/19 13:10:51 INFO DAGScheduler: Missing parents: List()
  18. 23/02/19 13:10:51 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at json at <unknown>:0), which has no missing parents
  19. 23/02/19 13:10:51 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.0 KiB, free 1048.6 MiB)
  20. 23/02/19 13:10:51 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.8 KiB, free 1048.5 MiB)
  21. 23/02/19 13:10:51 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on experian-el-d41b428669cc1e8e-driver-svc.environments-quin-dev-1.svc:7079 (size: 4.8 KiB, free: 1048.8 MiB)
  22. 23/02/19 13:10:51 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1513
  23. 23/02/19 13:10:51 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at json at <unknown>:0) (first 15 tasks are for partitions Vector(0))
  24. 23/02/19 13:10:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
  25. 23/02/19 13:10:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
  26. 23/02/19 13:10:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.14.142.21:37515 (size: 4.8 KiB, free: 2.2 GiB)
  27. 23/02/19 13:10:52 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.14.142.21:37515 (size: 35.8 KiB, free: 2.2 GiB)
  28. 23/02/19 13:10:52 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.14.142.21 executor 1): java.io.FileNotFoundException: File file:/app/data-Feb-19-2023_131049.json does not exist
  29. at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
  30. at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
  31. at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
  32. at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
  33. at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
  34. at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
  35. at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
  36. at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStream(CodecStreams.scala:40)
  37. at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStreamWithCloseResource(CodecStreams.scala:52)
  38. at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.dataToInputStream(JsonDataSource.scala:195)
  39. at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.createParser(JsonDataSource.scala:199)
  40. at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.$anonfun$infer$4(JsonDataSource.scala:165)
  41. at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$3(JsonInferSchema.scala:86)
  42. at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2763)
  43. at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$2(JsonInferSchema.scala:86)
  44. at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  45. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  46. at scala.collection.Iterator.isEmpty(Iterator.scala:387)
  47. at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
  48. at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
  49. at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
  50. at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
  51. at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
  52. at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
  53. at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
  54. at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
  55. at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:103)
  56. at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
  57. at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
  58. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  59. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  60. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  61. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  62. at org.apache.spark.scheduler.Task.run(Task.scala:136)
  63. at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  64. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  65. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  66. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  67. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  68. at java.base/java.lang.Thread.run(Unknown Source)
  69. 23/02/19 13:10:52 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
  70. 23/02/19 13:10:52 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on 10.14.142.21, executor 1: java.io.FileNotFoundException (File file:/app/data-Feb-19-2023_131049.json does not exist) [duplicate 1]
  71. 23/02/19 13:10:52 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
  72. 23/02/19 13:10:52 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on 10.14.142.21, executor 1: java.io.FileNotFoundException (File file:/app/data-Feb-19-2023_131049.json does not exist) [duplicate 2]
  73. 23/02/19 13:10:52 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
  74. 23/02/19 13:10:52 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on 10.14.142.21, executor 1: java.io.FileNotFoundException (File file:/app/data-Feb-19-2023_131049.json does not exist) [duplicate 3]
  75. 23/02/19 13:10:52 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
  76. 23/02/19 13:10:52 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
  77. 23/02/19 13:10:52 INFO TaskSchedulerImpl: Cancelling stage 0
  78. 23/02/19 13:10:52 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
  79. 23/02/19 13:10:52 INFO DAGScheduler: ResultStage 0 (json at <unknown>:0) failed in 1.128 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.14.142.21 executor 1): java.io.FileNotFoundException: File file:/app/data-Feb-19-2023_131049.json does not exist
  80. at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
  81. at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
  82. at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
  83. at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
  84. at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
  85. at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
  86. at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
  87. at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStream(CodecStreams.scala:40)
  88. at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStreamWithCloseResource(CodecStreams.scala:52)
  89. at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.dataToInputStream(JsonDataSource.scala:195)
  90. at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.createParser(JsonDataSource.scala:199)
  91. at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.$anonfun$infer$4(JsonDataSource.scala:165)
  92. at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$3(JsonInferSchema.scala:86)
  93. at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2763)
  94. at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$2(JsonInferSchema.scala:86)
  95. at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  96. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  97. at scala.collection.Iterator.isEmpty(Iterator.scala:387)
  98. at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
  99. at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
  100. at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
  101. at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
  102. at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
  103. at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
  104. at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
  105. at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
  106. at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:103)
  107. at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
  108. at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
  109. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  110. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  111. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  112. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  113. at org.apache.spark.scheduler.Task.run(Task.scala:136)
  114. at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  115. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  116. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  117. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  118. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  119. at java.base/java.lang.Thread.run(Unknown Source)

This is my code to download the file and and print its path

  1. def find_files(self, filename, search_path):
  2. result = []
  3. # Wlaking top-down from the root
  4. for root, dir, files in os.walk(search_path):
  5. if filename in files:
  6. result.append(os.path.join(root, filename))
  7. return result
  8. def downloadData(self, access_token, data):
  9. headers = {
  10. 'Content-Type': 'application/json',
  11. 'Charset': 'UTF-8',
  12. 'Authorization': f'Bearer {access_token}'
  13. }
  14. try:
  15. response = requests.post(self.kyc_url, data=json.dumps(data), headers=headers)
  16. response.raise_for_status()
  17. logger.debug("received kyc data")
  18. response_filename = ("data-" + time.strftime('%b-%d-%Y_%H%M%S', time.localtime()) + ".json")
  19. with open(response_filename, 'w', encoding='utf-8') as f:
  20. json.dump(response.json(), f, ensure_ascii=False, indent=4)
  21. f.close()
  22. print("FILE IS DOWNLOADED")
  23. print(self.find_files(response_filename, "/"))
  24. except requests.exceptions.HTTPError as err:
  25. logger.error("failed to fetch kyc data")
  26. raise SystemExit(err)
  27. return response_filename

This is my code to read the file and upload to minio

  1. def load(spark: SparkSession, json_file_path: str, destination_path: str) -> None:
  2. df = spark.read.option("multiline", "true").json(json_file_path)
  3. df.write.format("delta").save(f"s3a://{destination_path}")

I'm running spark in k8s with spark operator.

This is my SparkApplication manifest

  1. apiVersion: "sparkoperator.k8s.io/v1beta2"
  2. kind: SparkApplication
  3. metadata:
  4. name: myApp
  5. namespace: demo
  6. spec:
  7. type: Python
  8. pythonVersion: "3"
  9. mode: cluster
  10. image: "myImage"
  11. imagePullPolicy: Always
  12. mainApplicationFile: local:///app/main.py
  13. sparkVersion: "3.3.1"
  14. restartPolicy:
  15. type: OnFailure
  16. onFailureRetries: 3
  17. onFailureRetryInterval: 10
  18. onSubmissionFailureRetries: 5
  19. onSubmissionFailureRetryInterval: 20
  20. timeToLiveSeconds: 86400
  21. deps:
  22. packages:
  23. - io.delta:delta-core_2.12:2.2.0
  24. - org.apache.hadoop:hadoop-aws:3.3.1
  25. driver:
  26. env:
  27. - name: NAMESPACE
  28. value: demo
  29. cores: 2
  30. coreLimit: "2000m"
  31. memory: "2048m"
  32. labels:
  33. version: 3.3.1
  34. serviceAccount: spark-driver
  35. executor:
  36. cores: 4
  37. instances: 1
  38. memory: "4096m"
  39. coreRequest: "500m"
  40. coreLimit: "4000m"
  41. labels:
  42. version: 3.3.1
  43. dynamicAllocation:
  44. enabled: false

Can someone please point out what I am doing wrong ?

Thank you

答案1

得分: 1

如果您正在运行集群模式,那么您需要将输入文件共享在共享文件系统上,如HDFSS3,而不是本地文件系统,因为驱动程序和执行程序都应该能够访问输入文件。

英文:

If you are running in cluster mode then you need your input files to be shared on a shared FS like HDFS or S3 but not on local FS, since both of driver and executors should have access to the input file.

huangapple
  • 本文由 发表于 2023年2月19日 21:21:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/75500426.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定