ClassNotFoundException: org.apache.spark.sql.connector.read.SupportsRuntimeFiltering on Google Dataproc cluster using Airflow

huangapple go评论66阅读模式
英文:

ClassNotFoundException: org.apache.spark.sql.connector.read.SupportsRuntimeFiltering on Google Dataproc cluster using Airflow

问题

I have translated the relevant portion of your text:

"我已经设置了一个Dataproc集群,并尝试训练一个Spark管道,但一直出现这个类找不到异常。以下是错误日志:"

Please let me know if you need any further assistance with this issue.

英文:

Hi I have setup a dataproc cluster and I am trying to train a spark pipeline but I keep getting this class not found exception.
Here is the error log

23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/06/07 19:27:44 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/06/07 19:27:44 INFO org.sparkproject.jetty.util.log: Logging initialized @5840ms to org.sparkproject.jetty.util.log.Slf4jLog
23/06/07 19:27:44 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_372-b07
23/06/07 19:27:44 INFO org.sparkproject.jetty.server.Server: Started @5943ms
23/06/07 19:27:44 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@49eefd4d{HTTP/1.1, (http/1.1)}{0.0.0.0:42813}
23/06/07 19:27:46 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
Traceback (most recent call last):
File "/tmp/--/main.py", line 31, in <module>
df = string_index_data(df)
File "/tmp/--/sss.py", line 47, in string_index_data
pipeline = pipeline.fit(df)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 161, in fit
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 114, in _fit
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 161, in fit
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 335, in _fit
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 332, in _fit_java
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o112.fit.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/SupportsRuntimeFiltering
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.google.cloud.spark.bigquery.v2.Spark33BigQueryTable.newScanBuilder(Spark33BigQueryTable.java:34)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply$1.applyOrElse(V2ScanRelationPushDown.scala:35)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$apply$1.applyOrElse(V2ScanRelationPushDown.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:173)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:171)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:173)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:171)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:33)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:30)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:88)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:144)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:144)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:96)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:114)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:111)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:162)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:207)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:176)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2978)
at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:242)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 90 more
23/06/07 19:27:52 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@49eefd4d{HTTP/1.1, (http/1.1)}{0.0.0.0:0}

I have added installation of spark-sql jar in initialization script


wget -U "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)" -O ${CURRENT_DIR}/jars/spark-sql_2.13-3.4.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.13/3.4.0/spark-sql_2.13-3.4.0.jar

Also added in pyspark arguments

os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ./jars/mlflow-spark-1.30.0.jar,./jars/spark-3.3-bigquery-0.30.0.jar,./jars/scala-library-2.13.1.jar,./jars/s3mock_2.12-0.1.8.jar,./jars/spark-sql_2.13-3.4.0.jar  pyspark-shell'

Also tried passing it in the Dataproc Submit job operator

pyspark_job={
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": "gs://bucket-name/dependencies/main.py",
"python_file_uris":[f"gs://bucket-name/dependencies/config.py", f"gs://bucket-name/dependencies/util.py"],
"jar_file_uris": ["gs://bucket-name/dependencies/mlflow-spark-1.30.0.jar",
"gs://bucket-name/dependencies/spark-3.3-bigquery-0.30.0.jar",
"gs://bucket-name/dependencies/scala-library-2.13.1.jar",
"gs://bucket-name/dependencies/s3mock_2.12-0.1.8.jar", 
"gs://bucket-name/dependencies/spark-sql_2.13-3.4.0.jar"]
}
}
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=pyspark_job, region=REGION, project_id=PROJECT_ID, dag=dag_next_purchase
)

Already explained it in the description. I expected my pipeline to run correctly.

答案1

得分: 0

由于您没有设置特定的Dataproc图像版本,因此默认使用Dataproc图像2.0,其中包含Spark 3.1。这个Spark版本不包含这个类。

请将定义更改为

pyspark_job={
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": "gs://bucket-name/dependencies/main.py",
        "python_file_uris":["gs://bucket-name/dependencies/config.py", "gs://bucket-name/dependencies/util.py"],
        "jar_file_uris": ["gs://bucket-name/dependencies/mlflow-spark-1.30.0.jar",
                          "gs://bucket-name/dependencies/spark-3.1-bigquery-0.31.1.jar",
                          "gs://bucket-name/dependencies/s3mock_2.12-0.1.8.jar"]
        }
    }

还请注意,我已经移除了Spark已经提供的jar包 - scala-library和spark-sql。包含它们可能会导致其他问题。

英文:

Since you are not setting a specific dataproc image version, the default is Dataproc image 2.0, which comes with Spark 3.1. This spark does not contain this class.

Please change the definition to

pyspark_job={
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": "gs://bucket-name/dependencies/main.py",
"python_file_uris":[f"gs://bucket-name/dependencies/config.py", f"gs://bucket-name/dependencies/util.py"],
"jar_file_uris": ["gs://bucket-name/dependencies/mlflow-spark-1.30.0.jar",
"gs://bucket-name/dependencies/spark-3.1-bigquery-0.31.1.jar",
"gs://bucket-name/dependencies/s3mock_2.12-0.1.8.jar"]
}
}

Also, notice that I've removed the jars spark already provides - scala-library and spark-sql. Inclusing them may lead to other issues.

huangapple
  • 本文由 发表于 2023年6月8日 04:57:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/76427072.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定