Got "Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found" in Amazon EMR

huangapple go评论85阅读模式
英文:

Got "Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found" in Amazon EMR

问题

我有一个Spark应用程序。这是沉降到Amazon MSK的代码

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "xxx.kafka.us-west-2.amazonaws.com:9098",
  )
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "AWS_MSK_IAM")
  .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
  .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
  .option("topic", "hm.motor.avro")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

我的build.sbt看起来像这样

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"

val sparkVersion = "3.3.1"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.475" % "provided",

  "org.apache.spark" %% "spark-avro" % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
  "io.delta" %% "delta-core" % "2.3.0",
  "za.co.absa" %% "abris" % "6.3.0"
)

ThisBuild / assemblyMergeStrategy := {
  // https://stackoverflow.com/a/67937671/2000548
  case PathList("module-info.class") => MergeStrategy.discard
  case x if x.endsWith("/module-info.class") => MergeStrategy.discard
  // https://stackoverflow.com/a/76129963/2000548
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

我使用spark-submit到Amazon EMR后,出现了错误

Caused by: org.apache.kafka.common.config.ConfigException: Invalid value software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.
  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
  at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:536)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)
  at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)
  at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)
  at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)
  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550)
  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:138)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
英文:

I have a Spark application. Here is the code sinking to Amazon MSK

    val query = df.writeStream
      .format(&quot;kafka&quot;)
      .option(
        &quot;kafka.bootstrap.servers&quot;,
        &quot;xxx.kafka.us-west-2.amazonaws.com:9098&quot;,
      )
      .option(&quot;kafka.security.protocol&quot;, &quot;SASL_SSL&quot;)
      .option(&quot;kafka.sasl.mechanism&quot;, &quot;AWS_MSK_IAM&quot;)
      .option(&quot;kafka.sasl.jaas.config&quot;, &quot;software.amazon.msk.auth.iam.IAMLoginModule required;&quot;)
      .option(&quot;kafka.sasl.client.callback.handler.class&quot;, &quot;software.amazon.msk.auth.iam.IAMClientCallbackHandler&quot;)
      .option(&quot;topic&quot;, &quot;hm.motor.avro&quot;)
      .option(&quot;checkpointLocation&quot;, &quot;/tmp/checkpoint&quot;)
      .start()

My build.sbt looks like

name := &quot;IngestFromS3ToKafka&quot;
version := &quot;1.0&quot;
scalaVersion := &quot;2.12.17&quot;
resolvers += &quot;confluent&quot; at &quot;https://packages.confluent.io/maven/&quot;

val sparkVersion = &quot;3.3.1&quot;

libraryDependencies ++= Seq(
  &quot;org.apache.spark&quot; %% &quot;spark-core&quot; % sparkVersion % &quot;provided&quot;,
  &quot;org.apache.spark&quot; %% &quot;spark-sql&quot; % sparkVersion % &quot;provided&quot;,
  &quot;org.apache.hadoop&quot; % &quot;hadoop-common&quot; % &quot;3.3.5&quot; % &quot;provided&quot;,
  &quot;org.apache.hadoop&quot; % &quot;hadoop-aws&quot; % &quot;3.3.5&quot; % &quot;provided&quot;,
  &quot;com.amazonaws&quot; % &quot;aws-java-sdk-bundle&quot; % &quot;1.12.475&quot; % &quot;provided&quot;,

  &quot;org.apache.spark&quot; %% &quot;spark-avro&quot; % sparkVersion,
  &quot;org.apache.spark&quot; %% &quot;spark-sql-kafka-0-10&quot; % sparkVersion,
  &quot;io.delta&quot; %% &quot;delta-core&quot; % &quot;2.3.0&quot;,
  &quot;za.co.absa&quot; %% &quot;abris&quot; % &quot;6.3.0&quot;
)

ThisBuild / assemblyMergeStrategy := {
  // https://stackoverflow.com/a/67937671/2000548
  case PathList(&quot;module-info.class&quot;) =&gt; MergeStrategy.discard
  case x if x.endsWith(&quot;/module-info.class&quot;) =&gt; MergeStrategy.discard
  // https://stackoverflow.com/a/76129963/2000548
  case PathList(&quot;org&quot;, &quot;apache&quot;, &quot;spark&quot;, &quot;unused&quot;, &quot;UnusedStubClass.class&quot;) =&gt; MergeStrategy.first
  case x =&gt;
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

After I spark-submit to Amazon EMR, I got error

Caused by: org.apache.kafka.common.config.ConfigException: Invalid value software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.
  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
  at org.apache.kafka.common.config.AbstractConfig.&lt;init&gt;(AbstractConfig.java:108)
  at org.apache.kafka.common.config.AbstractConfig.&lt;init&gt;(AbstractConfig.java:129)
  at org.apache.kafka.clients.producer.ProducerConfig.&lt;init&gt;(ProducerConfig.java:536)
  at org.apache.kafka.clients.producer.KafkaProducer.&lt;init&gt;(KafkaProducer.java:291)
  at org.apache.kafka.clients.producer.KafkaProducer.&lt;init&gt;(KafkaProducer.java:274)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)
  at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)
  at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)
  at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)
  at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)
  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550)
  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:138)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)

答案1

得分: 0

首先需要在 build.sbt 中添加 aws-msk-iam-auth

在我的情况下,我还需要为 io.netty.versions.properties 添加合并策略。

libraryDependencies ++= Seq(
  // ...
  "software.amazon.msk" % "aws-msk-iam-auth" % "1.1.6"
)

ThisBuild / assemblyMergeStrategy := {
  // ...

  // https://stackoverflow.com/a/54634225/2000548
  case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

然后错误就消失了!

英文:

First need add aws-msk-iam-auth in the build.sbt.

In my case, I also need add assembly merge strategy for io.netty.versions.properties.

libraryDependencies ++= Seq(
  // ...
  &quot;software.amazon.msk&quot; % &quot;aws-msk-iam-auth&quot; % &quot;1.1.6&quot;
)

ThisBuild / assemblyMergeStrategy := {
  // ...

  // https://stackoverflow.com/a/54634225/2000548
  case x if x.contains(&quot;io.netty.versions.properties&quot;) =&gt; MergeStrategy.discard
  case x =&gt;
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

Then the error is gone!

huangapple
  • 本文由 发表于 2023年6月2日 04:56:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76385650.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定