英文:
Got "Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found" in Amazon EMR
问题
我有一个Spark应用程序。这是沉降到Amazon MSK的代码
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"xxx.kafka.us-west-2.amazonaws.com:9098",
)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "AWS_MSK_IAM")
.option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
.option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
.option("topic", "hm.motor.avro")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
我的build.sbt看起来像这样
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
val sparkVersion = "3.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.475" % "provided",
"org.apache.spark" %% "spark-avro" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"io.delta" %% "delta-core" % "2.3.0",
"za.co.absa" %% "abris" % "6.3.0"
)
ThisBuild / assemblyMergeStrategy := {
// https://stackoverflow.com/a/67937671/2000548
case PathList("module-info.class") => MergeStrategy.discard
case x if x.endsWith("/module-info.class") => MergeStrategy.discard
// https://stackoverflow.com/a/76129963/2000548
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}
我使用spark-submit
到Amazon EMR后,出现了错误
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:536)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
英文:
I have a Spark application. Here is the code sinking to Amazon MSK
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"xxx.kafka.us-west-2.amazonaws.com:9098",
)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "AWS_MSK_IAM")
.option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
.option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
.option("topic", "hm.motor.avro")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
My build.sbt looks like
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
val sparkVersion = "3.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.475" % "provided",
"org.apache.spark" %% "spark-avro" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"io.delta" %% "delta-core" % "2.3.0",
"za.co.absa" %% "abris" % "6.3.0"
)
ThisBuild / assemblyMergeStrategy := {
// https://stackoverflow.com/a/67937671/2000548
case PathList("module-info.class") => MergeStrategy.discard
case x if x.endsWith("/module-info.class") => MergeStrategy.discard
// https://stackoverflow.com/a/76129963/2000548
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}
After I spark-submit
to Amazon EMR, I got error
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:536)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
答案1
得分: 0
首先需要在 build.sbt 中添加 aws-msk-iam-auth
。
在我的情况下,我还需要为 io.netty.versions.properties
添加合并策略。
libraryDependencies ++= Seq(
// ...
"software.amazon.msk" % "aws-msk-iam-auth" % "1.1.6"
)
ThisBuild / assemblyMergeStrategy := {
// ...
// https://stackoverflow.com/a/54634225/2000548
case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}
然后错误就消失了!
英文:
First need add aws-msk-iam-auth
in the build.sbt.
In my case, I also need add assembly merge strategy for io.netty.versions.properties
.
libraryDependencies ++= Seq(
// ...
"software.amazon.msk" % "aws-msk-iam-auth" % "1.1.6"
)
ThisBuild / assemblyMergeStrategy := {
// ...
// https://stackoverflow.com/a/54634225/2000548
case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}
Then the error is gone!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论