ClassNotFoundException: org.apache.spark.sql.connector.read.SupportsRuntimeFiltering on Google Dataproc cluster using Airflow

huangapple go评论85阅读模式
英文:

ClassNotFoundException: org.apache.spark.sql.connector.read.SupportsRuntimeFiltering on Google Dataproc cluster using Airflow

问题

I have translated the relevant portion of your text:

"我已经设置了一个Dataproc集群,并尝试训练一个Spark管道,但一直出现这个类找不到异常。以下是错误日志:"

Please let me know if you need any further assistance with this issue.

英文:

Hi I have setup a dataproc cluster and I am trying to train a spark pipeline but I keep getting this class not found exception.
Here is the error log

  1. 23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
  2. 23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
  3. 23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
  4. 23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
  5. 23/06/07 19:27:44 INFO org.sparkproject.jetty.util.log: Logging initialized @5840ms to org.sparkproject.jetty.util.log.Slf4jLog
  6. 23/06/07 19:27:44 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_372-b07
  7. 23/06/07 19:27:44 INFO org.sparkproject.jetty.server.Server: Started @5943ms
  8. 23/06/07 19:27:44 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@49eefd4d{HTTP/1.1, (http/1.1)}{0.0.0.0:42813}
  9. 23/06/07 19:27:46 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
  10. Traceback (most recent call last):
  11. File "/tmp/--/main.py", line 31, in <module>
  12. df = string_index_data(df)
  13. File "/tmp/--/sss.py", line 47, in string_index_data
  14. pipeline = pipeline.fit(df)
  15. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 161, in fit
  16. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 114, in _fit
  17. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 161, in fit
  18. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 335, in _fit
  19. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 332, in _fit_java
  20. File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  21. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  22. File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
  23. py4j.protocol.Py4JJavaError: An error occurred while calling o112.fit.
  24. : java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/SupportsRuntimeFiltering
  25. at java.lang.ClassLoader.defineClass1(Native Method)
  26. at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
  27. at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  28. at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
  29. at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
  30. at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
  31. at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
  32. at java.security.AccessController.doPrivileged(Native Method)
  33. at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
  34. at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  35. at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  36. at java.lang.ClassLoader.defineClass1(Native Method)
  37. at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
  38. at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  39. at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
  40. at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
  41. at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
  42. at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
  43. at java.security.AccessController.doPrivileged(Native Method)
  44. at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
  45. at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  46. at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  47. at com.google.cloud.spark.bigquery.v2.Spark33BigQueryTable.newScanBuilder(Spark33BigQueryTable.java:34)
  48. at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply$1.applyOrElse(V2ScanRelationPushDown.scala:35)
  49. at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply$1.applyOrElse(V2ScanRelationPushDown.scala:33)
  50. at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
  51. at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
  52. at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
  53. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  54. at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:173)
  55. at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:171)
  56. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  57. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  58. at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
  59. at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
  60. at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
  61. at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
  62. at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
  63. at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
  64. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  65. at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:173)
  66. at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:171)
  67. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  68. at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:33)
  69. at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:30)
  70. at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
  71. at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  72. at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  73. at scala.collection.immutable.List.foldLeft(List.scala:91)
  74. at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
  75. at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
  76. at scala.collection.immutable.List.foreach(List.scala:431)
  77. at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
  78. at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
  79. at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  80. at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
  81. at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:88)
  82. at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  83. at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:144)
  84. at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  85. at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:144)
  86. at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:85)
  87. at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:85)
  88. at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:96)
  89. at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:114)
  90. at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:111)
  91. at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:162)
  92. at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:207)
  93. at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:176)
  94. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
  95. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  96. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  97. at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  98. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  99. at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
  100. at org.apache.spark.sql.Dataset.collect(Dataset.scala:2978)
  101. at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
  102. at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
  103. at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:242)
  104. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  105. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  106. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  107. at java.lang.reflect.Method.invoke(Method.java:498)
  108. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  109. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  110. at py4j.Gateway.invoke(Gateway.java:282)
  111. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  112. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  113. at py4j.GatewayConnection.run(GatewayConnection.java:238)
  114. at java.lang.Thread.run(Thread.java:750)
  115. Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
  116. at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  117. at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  118. at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  119. ... 90 more
  120. 23/06/07 19:27:52 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@49eefd4d{HTTP/1.1, (http/1.1)}{0.0.0.0:0}

I have added installation of spark-sql jar in initialization script

  1. wget -U "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)" -O ${CURRENT_DIR}/jars/spark-sql_2.13-3.4.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.13/3.4.0/spark-sql_2.13-3.4.0.jar

Also added in pyspark arguments

  1. os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ./jars/mlflow-spark-1.30.0.jar,./jars/spark-3.3-bigquery-0.30.0.jar,./jars/scala-library-2.13.1.jar,./jars/s3mock_2.12-0.1.8.jar,./jars/spark-sql_2.13-3.4.0.jar pyspark-shell'

Also tried passing it in the Dataproc Submit job operator

  1. pyspark_job={
  2. "reference": {"project_id": PROJECT_ID},
  3. "placement": {"cluster_name": CLUSTER_NAME},
  4. "pyspark_job": {
  5. "main_python_file_uri": "gs://bucket-name/dependencies/main.py",
  6. "python_file_uris":[f"gs://bucket-name/dependencies/config.py", f"gs://bucket-name/dependencies/util.py"],
  7. "jar_file_uris": ["gs://bucket-name/dependencies/mlflow-spark-1.30.0.jar",
  8. "gs://bucket-name/dependencies/spark-3.3-bigquery-0.30.0.jar",
  9. "gs://bucket-name/dependencies/scala-library-2.13.1.jar",
  10. "gs://bucket-name/dependencies/s3mock_2.12-0.1.8.jar",
  11. "gs://bucket-name/dependencies/spark-sql_2.13-3.4.0.jar"]
  12. }
  13. }
  14. pyspark_task = DataprocSubmitJobOperator(
  15. task_id="pyspark_task", job=pyspark_job, region=REGION, project_id=PROJECT_ID, dag=dag_next_purchase
  16. )

Already explained it in the description. I expected my pipeline to run correctly.

答案1

得分: 0

由于您没有设置特定的Dataproc图像版本,因此默认使用Dataproc图像2.0,其中包含Spark 3.1。这个Spark版本不包含这个类。

请将定义更改为

  1. pyspark_job={
  2. "reference": {"project_id": PROJECT_ID},
  3. "placement": {"cluster_name": CLUSTER_NAME},
  4. "pyspark_job": {
  5. "main_python_file_uri": "gs://bucket-name/dependencies/main.py",
  6. "python_file_uris":["gs://bucket-name/dependencies/config.py", "gs://bucket-name/dependencies/util.py"],
  7. "jar_file_uris": ["gs://bucket-name/dependencies/mlflow-spark-1.30.0.jar",
  8. "gs://bucket-name/dependencies/spark-3.1-bigquery-0.31.1.jar",
  9. "gs://bucket-name/dependencies/s3mock_2.12-0.1.8.jar"]
  10. }
  11. }

还请注意,我已经移除了Spark已经提供的jar包 - scala-library和spark-sql。包含它们可能会导致其他问题。

英文:

Since you are not setting a specific dataproc image version, the default is Dataproc image 2.0, which comes with Spark 3.1. This spark does not contain this class.

Please change the definition to

  1. pyspark_job={
  2. "reference": {"project_id": PROJECT_ID},
  3. "placement": {"cluster_name": CLUSTER_NAME},
  4. "pyspark_job": {
  5. "main_python_file_uri": "gs://bucket-name/dependencies/main.py",
  6. "python_file_uris":[f"gs://bucket-name/dependencies/config.py", f"gs://bucket-name/dependencies/util.py"],
  7. "jar_file_uris": ["gs://bucket-name/dependencies/mlflow-spark-1.30.0.jar",
  8. "gs://bucket-name/dependencies/spark-3.1-bigquery-0.31.1.jar",
  9. "gs://bucket-name/dependencies/s3mock_2.12-0.1.8.jar"]
  10. }
  11. }

Also, notice that I've removed the jars spark already provides - scala-library and spark-sql. Inclusing them may lead to other issues.

huangapple
  • 本文由 发表于 2023年6月8日 04:57:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/76427072.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定