Kafka Streams 内部的重新分配主题未被创建。

huangapple go评论52阅读模式
英文:

Kafka streams internal repartition topics not being created

问题

我正在运行一个Kafka Streams应用程序,该应用程序为拓扑的不同分支上发生的窗口聚合创建了2个内部的repartition主题。

当我在本地运行Kafka Streams应用程序或在Docker Compose环境中运行时,一切似乎都正常工作,AdminClient在消费者尝试从中读取时创建了内部主题。

然而,当我将此应用程序部署到由Nomad编排的环境中,其中Kafka经纪人使用SSL,尽管全局流配置中提供了SSL证书,但应用程序不会创建内部主题。

我可以从日志中看到AdminClient似乎已正确配置:

	bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
    client.dns.lookup = use_all_dns_ips
	client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	...(其他配置参数)

基本上,应用程序最终被终止,我看到以下日志:

我在日志中根本看不到AdminClient被调用以创建内部主题:

[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}

这里可能出了什么问题?我在应用程序配置中创建了另一个AdminClient bean(这是一个Spring Boot应用程序),用于帮助创建输出主题和死信主题。它似乎成功创建了这些主题,并且与内部Kafka Streams AdminClient具有类似的配置,因此不清楚为什么内部主题没有被创建,非常感谢任何帮助。

英文:

I'm running a kafka streams application which create 2 internal repartition topics for windowed aggregations that occur on different branches of the topology.

When I run the kafka streams application locally or on a docker-compose environment everything seems to work fine and the AdminClient creates the internal topics, when the consumers attempt to read from them.

However when deploying this application to a nomad orchestrated environment where the kafka brokers use SSL, despite the SSL certificates being supplied in the global streams configuration, the application doesn't create the internal topics.

I can see from the logs that the AdminClient seems to be correctly configured:

	bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
    client.dns.lookup = use_all_dns_ips
	client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = SSL
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = 
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = /tmp/kafka_keystore.jks
	ssl.keystore.password = [hidden]
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = /tmp/kafka_truststore.jks
	ssl.truststore.password = [hidden]
	ssl.truststore.type = JKS

Basically the application ends up being killed and I see the following logs.

I don't see the admin client being called in the logs to create the internal topics at all:

[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}

Any idea on what might be wrong here? I have another AdminClient bean (it's a springboot application) that I create in the application configuration to help create the output topic and a dead letter topic. It seems to succeed in creating those topics and has similar config to the internal kafka streams AdminClient, so not sure why the internal topics aren't created, any help is greatly appreciated.

答案1

得分: 2

找到了问题,不幸的是,我在这个应用程序中也使用了springboot和spring-kafka作为依赖项。根据文档(https://docs.spring.io/spring-kafka/reference/html/#configuring-topics),Spring Boot会启动一个默认的KafkaAdmin实例(除非您将autoCreate属性设置为false - 默认为true)。

由于我没有将SSL属性注入到Spring属性中,这个默认的AdminClient没有为SSL进行配置,并且它覆盖了Kafka Streams 创建的AdminClient(Kafka Streams的AdminClient已正确配置为SSL)。

一旦我将autoCreate属性设置为false,就不再创建默认的Spring AdminClient,因此我不必注入额外的SSL属性,一切都正常工作,因为kafka-streams创建的AdminClient正在使用。

英文:

Found the issue, unfortunately I was using springboot and spring-kafka as dependencies in this application as well. So according to the documentation (https://docs.spring.io/spring-kafka/reference/html/#configuring-topics), spring boot starts a default instance of the KafkaAdmin (unless you set the autoCreate property to false - it's true by default).

As I was not injecting the SSL properties into spring properties this default AdminClient was not configured for SSL and was superseding the one created by Kafka Streams (which was correctly configured for SSL).

Once I set the autoCreate property to false, the default spring AdminClient is no longer created and therefore I don't have to inject additional SSL properties and everything works fine as the kafka-streams created admin client is being used.

huangapple
  • 本文由 发表于 2023年3月9日 17:43:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/75682794.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定