Kafka Streams 内部的重新分配主题未被创建。

huangapple go评论86阅读模式
英文:

Kafka streams internal repartition topics not being created

问题

我正在运行一个Kafka Streams应用程序,该应用程序为拓扑的不同分支上发生的窗口聚合创建了2个内部的repartition主题。

当我在本地运行Kafka Streams应用程序或在Docker Compose环境中运行时,一切似乎都正常工作,AdminClient在消费者尝试从中读取时创建了内部主题。

然而,当我将此应用程序部署到由Nomad编排的环境中,其中Kafka经纪人使用SSL,尽管全局流配置中提供了SSL证书,但应用程序不会创建内部主题。

我可以从日志中看到AdminClient似乎已正确配置:

  1. bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
  2. client.dns.lookup = use_all_dns_ips
  3. client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
  4. connections.max.idle.ms = 300000
  5. default.api.timeout.ms = 60000
  6. metadata.max.age.ms = 300000
  7. metric.reporters = []
  8. metrics.num.samples = 2
  9. metrics.recording.level = INFO
  10. metrics.sample.window.ms = 30000
  11. receive.buffer.bytes = 65536
  12. reconnect.backoff.max.ms = 1000
  13. reconnect.backoff.ms = 50
  14. request.timeout.ms = 30000
  15. retries = 2147483647
  16. retry.backoff.ms = 100
  17. ...(其他配置参数)

基本上,应用程序最终被终止,我看到以下日志:

我在日志中根本看不到AdminClient被调用以创建内部主题:

  1. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
  2. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
  3. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
  4. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}

这里可能出了什么问题?我在应用程序配置中创建了另一个AdminClient bean(这是一个Spring Boot应用程序),用于帮助创建输出主题和死信主题。它似乎成功创建了这些主题,并且与内部Kafka Streams AdminClient具有类似的配置,因此不清楚为什么内部主题没有被创建,非常感谢任何帮助。

英文:

I'm running a kafka streams application which create 2 internal repartition topics for windowed aggregations that occur on different branches of the topology.

When I run the kafka streams application locally or on a docker-compose environment everything seems to work fine and the AdminClient creates the internal topics, when the consumers attempt to read from them.

However when deploying this application to a nomad orchestrated environment where the kafka brokers use SSL, despite the SSL certificates being supplied in the global streams configuration, the application doesn't create the internal topics.

I can see from the logs that the AdminClient seems to be correctly configured:

  1. bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
  2. client.dns.lookup = use_all_dns_ips
  3. client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
  4. connections.max.idle.ms = 300000
  5. default.api.timeout.ms = 60000
  6. metadata.max.age.ms = 300000
  7. metric.reporters = []
  8. metrics.num.samples = 2
  9. metrics.recording.level = INFO
  10. metrics.sample.window.ms = 30000
  11. receive.buffer.bytes = 65536
  12. reconnect.backoff.max.ms = 1000
  13. reconnect.backoff.ms = 50
  14. request.timeout.ms = 30000
  15. retries = 2147483647
  16. retry.backoff.ms = 100
  17. sasl.client.callback.handler.class = null
  18. sasl.jaas.config = null
  19. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  20. sasl.kerberos.min.time.before.relogin = 60000
  21. sasl.kerberos.service.name = null
  22. sasl.kerberos.ticket.renew.jitter = 0.05
  23. sasl.kerberos.ticket.renew.window.factor = 0.8
  24. sasl.login.callback.handler.class = null
  25. sasl.login.class = null
  26. sasl.login.connect.timeout.ms = null
  27. sasl.login.read.timeout.ms = null
  28. sasl.login.refresh.buffer.seconds = 300
  29. sasl.login.refresh.min.period.seconds = 60
  30. sasl.login.refresh.window.factor = 0.8
  31. sasl.login.refresh.window.jitter = 0.05
  32. sasl.login.retry.backoff.max.ms = 10000
  33. sasl.login.retry.backoff.ms = 100
  34. sasl.mechanism = GSSAPI
  35. sasl.oauthbearer.clock.skew.seconds = 30
  36. sasl.oauthbearer.expected.audience = null
  37. sasl.oauthbearer.expected.issuer = null
  38. sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
  39. sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
  40. sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
  41. sasl.oauthbearer.jwks.endpoint.url = null
  42. sasl.oauthbearer.scope.claim.name = scope
  43. sasl.oauthbearer.sub.claim.name = sub
  44. sasl.oauthbearer.token.endpoint.url = null
  45. security.protocol = SSL
  46. security.providers = null
  47. send.buffer.bytes = 131072
  48. socket.connection.setup.timeout.max.ms = 30000
  49. socket.connection.setup.timeout.ms = 10000
  50. ssl.cipher.suites = null
  51. ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
  52. ssl.endpoint.identification.algorithm =
  53. ssl.engine.factory.class = null
  54. ssl.key.password = null
  55. ssl.keymanager.algorithm = SunX509
  56. ssl.keystore.certificate.chain = null
  57. ssl.keystore.key = null
  58. ssl.keystore.location = /tmp/kafka_keystore.jks
  59. ssl.keystore.password = [hidden]
  60. ssl.keystore.type = JKS
  61. ssl.protocol = TLSv1.3
  62. ssl.provider = null
  63. ssl.secure.random.implementation = null
  64. ssl.trustmanager.algorithm = PKIX
  65. ssl.truststore.certificates = null
  66. ssl.truststore.location = /tmp/kafka_truststore.jks
  67. ssl.truststore.password = [hidden]
  68. ssl.truststore.type = JKS

Basically the application ends up being killed and I see the following logs.

I don't see the admin client being called in the logs to create the internal topics at all:

  1. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
  2. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
  3. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
  4. [namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}

Any idea on what might be wrong here? I have another AdminClient bean (it's a springboot application) that I create in the application configuration to help create the output topic and a dead letter topic. It seems to succeed in creating those topics and has similar config to the internal kafka streams AdminClient, so not sure why the internal topics aren't created, any help is greatly appreciated.

答案1

得分: 2

找到了问题,不幸的是,我在这个应用程序中也使用了springboot和spring-kafka作为依赖项。根据文档(https://docs.spring.io/spring-kafka/reference/html/#configuring-topics),Spring Boot会启动一个默认的KafkaAdmin实例(除非您将autoCreate属性设置为false - 默认为true)。

由于我没有将SSL属性注入到Spring属性中,这个默认的AdminClient没有为SSL进行配置,并且它覆盖了Kafka Streams 创建的AdminClient(Kafka Streams的AdminClient已正确配置为SSL)。

一旦我将autoCreate属性设置为false,就不再创建默认的Spring AdminClient,因此我不必注入额外的SSL属性,一切都正常工作,因为kafka-streams创建的AdminClient正在使用。

英文:

Found the issue, unfortunately I was using springboot and spring-kafka as dependencies in this application as well. So according to the documentation (https://docs.spring.io/spring-kafka/reference/html/#configuring-topics), spring boot starts a default instance of the KafkaAdmin (unless you set the autoCreate property to false - it's true by default).

As I was not injecting the SSL properties into spring properties this default AdminClient was not configured for SSL and was superseding the one created by Kafka Streams (which was correctly configured for SSL).

Once I set the autoCreate property to false, the default spring AdminClient is no longer created and therefore I don't have to inject additional SSL properties and everything works fine as the kafka-streams created admin client is being used.

huangapple
  • 本文由 发表于 2023年3月9日 17:43:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/75682794.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定