Kafka Streams在auto.register.schemas=false情况下无法将消息发送到动态创建的主题。

huangapple go评论64阅读模式
英文:

Kafka streams fails to send messages to dynamically created topics when auto.register.schemas=false

问题

Kafka Streams 动态创建辅助主题,用于操作,如 repartition() 或 KTable 连接,并使用配置的默认序列化器发送消息。
auto.register.schemas=true(默认设置)时,Kafka Streams 发送消息时会注册 Avro 模式。
但是,当 auto.register.schemas=false 时,Kafka Streams 会出现 "Subject not found" 的错误(如另一个 StackOverflow 帖子 中讨论的)。

Confluent 文档 对此问题没有提及,但这是否是预期行为?还是avro4k-kafka-serializer 中的一个 bug(我尚未尝试内置的 SpecificAvroSerializer)?如有必要,我将创建一个最小示例来展示这个问题。

英文:

Kafka Streams dynamically creates auxiliary topics for operations such as repartition() or KTable joins, and sends messages using the configured default serializers.
When auto.register.schemas=true (the default setting), an Avro schema is registered when Kafka Streams sends a message.
But when auto.register.schemas=false, Kafka Streams errors with "Subject not found" (as discussed in another StackOverflow post).

The Confluent documentation is silent on the matter, but is this the expected behaviour? Or is this a bug in avro4k-kafka-serializer (I've yet to try with the built-in SpecificAvroSerializer)? If necessary I'll create a minimal example that exhibits the problem.

答案1

得分: 1

这是由以下原因引起的:

  1. 动态主题被创建。发生的情况是Kafka Streams要求代理为您的应用程序创建正确名称的主题。
  2. 在某个时刻,您决定写入该主题,意味着您开始发布消息。在这一点上,Kafka Streams尝试从Avro Schema Registry中获取模式。如果未能找到与主题匹配的模式(主题是根据您的serde配置确定的),它将尝试注册模式。但在您的情况下,auto.register.schema被设置为false,因此不会尝试注册。因为没有模式,所以会引发错误。

如何修复这个问题:
在启动应用程序之前,通过模式注册表的REST API(https://docs.confluent.io/platform/current/schema-registry/develop/api.html)注册您的模式。

如果您的Avro模式不在*.avsc文件中,您可以轻松生成它:

ReflectData.get().getSchema(YourClass.class).toString();

注意:代码部分不进行翻译。

英文:

This is caused by the following:

  1. The dynamic topic is created. What happens is Kafka Streams asks the broker to create the topic with the right name for your application.
  2. At some point you decide to write to that topic, meaning you start publishing messages. What happens at that point is KafkaStreams attempts to fetch the schema from Avro Schema Registry. If it fails to find a schema matching the subject (the subject is determined based on your serde configuration). It will attempt to register the schema. However in your case auto.register.schema is set to false, so no registration attempt is made. As there is no schema an error is thrown.

How to fix this:
Before starting your application register your schema via REST API of schema registry (https://docs.confluent.io/platform/current/schema-registry/develop/api.html).

If you don't have your avro schem in an *.avsc file, you can easily generate it:

ReflectData.get().getSChema(YourClass.class).toString();

huangapple
  • 本文由 发表于 2023年6月15日 20:21:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/76482427.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定