英文:
Kafka streams internal repartition topics not being created
问题
我正在运行一个Kafka Streams应用程序,该应用程序为拓扑的不同分支上发生的窗口聚合创建了2个内部的repartition主题。
当我在本地运行Kafka Streams应用程序或在Docker Compose环境中运行时,一切似乎都正常工作,AdminClient在消费者尝试从中读取时创建了内部主题。
然而,当我将此应用程序部署到由Nomad编排的环境中,其中Kafka经纪人使用SSL,尽管全局流配置中提供了SSL证书,但应用程序不会创建内部主题。
我可以从日志中看到AdminClient似乎已正确配置:
bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
client.dns.lookup = use_all_dns_ips
client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
...(其他配置参数)
基本上,应用程序最终被终止,我看到以下日志:
我在日志中根本看不到AdminClient被调用以创建内部主题:
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
这里可能出了什么问题?我在应用程序配置中创建了另一个AdminClient bean(这是一个Spring Boot应用程序),用于帮助创建输出主题和死信主题。它似乎成功创建了这些主题,并且与内部Kafka Streams AdminClient具有类似的配置,因此不清楚为什么内部主题没有被创建,非常感谢任何帮助。
英文:
I'm running a kafka streams application which create 2 internal repartition topics for windowed aggregations that occur on different branches of the topology.
When I run the kafka streams application locally or on a docker-compose environment everything seems to work fine and the AdminClient creates the internal topics, when the consumers attempt to read from them.
However when deploying this application to a nomad orchestrated environment where the kafka brokers use SSL, despite the SSL certificates being supplied in the global streams configuration, the application doesn't create the internal topics.
I can see from the logs that the AdminClient seems to be correctly configured:
bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
client.dns.lookup = use_all_dns_ips
client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm =
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /tmp/kafka_keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /tmp/kafka_truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
Basically the application ends up being killed and I see the following logs.
I don't see the admin client being called in the logs to create the internal topics at all:
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
Any idea on what might be wrong here? I have another AdminClient bean (it's a springboot application) that I create in the application configuration to help create the output topic and a dead letter topic. It seems to succeed in creating those topics and has similar config to the internal kafka streams AdminClient, so not sure why the internal topics aren't created, any help is greatly appreciated.
答案1
得分: 2
找到了问题,不幸的是,我在这个应用程序中也使用了springboot和spring-kafka作为依赖项。根据文档(https://docs.spring.io/spring-kafka/reference/html/#configuring-topics),Spring Boot会启动一个默认的KafkaAdmin实例(除非您将autoCreate属性设置为false - 默认为true)。
由于我没有将SSL属性注入到Spring属性中,这个默认的AdminClient没有为SSL进行配置,并且它覆盖了Kafka Streams 创建的AdminClient(Kafka Streams的AdminClient已正确配置为SSL)。
一旦我将autoCreate属性设置为false,就不再创建默认的Spring AdminClient,因此我不必注入额外的SSL属性,一切都正常工作,因为kafka-streams创建的AdminClient正在使用。
英文:
Found the issue, unfortunately I was using springboot and spring-kafka as dependencies in this application as well. So according to the documentation (https://docs.spring.io/spring-kafka/reference/html/#configuring-topics), spring boot starts a default instance of the KafkaAdmin (unless you set the autoCreate property to false - it's true by default).
As I was not injecting the SSL properties into spring properties this default AdminClient was not configured for SSL and was superseding the one created by Kafka Streams (which was correctly configured for SSL).
Once I set the autoCreate property to false, the default spring AdminClient is no longer created and therefore I don't have to inject additional SSL properties and everything works fine as the kafka-streams created admin client is being used.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论