Micronaut,Kafka与Avro – 消费者错误:未指定所需参数[EasySchema消息]。

huangapple go评论107阅读模式
英文:

Micronaut, Kafka with Avro - Consumer Error: Required argument [EasySchema message] not specified

问题

我使用Micronaut、Kafka和Avro进行工作。

我的生产者可以成功地向Kafka发送消息,但不幸的是,我的消费者无法正确读取它们。错误信息如下:
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: 未指定所需参数[EasySchema message]

我使用以下AVRO模式生成一个类:

  1. {
  2. "type" : "record",
  3. "name" : "EasySchema",
  4. "namespace" : "de.mydata.kafka.easy",
  5. "fields" : [ {
  6. "name" : "active",
  7. "type" : "boolean"
  8. }, {
  9. "name" : "name",
  10. "type" : "string"
  11. }
  12. ]
  13. }

我使用这个模式成功地发送消息。消息成功发送到Kafka,我在GUI中看到了。

  1. public void sendMessages() throws Exception {
  2. EasySchema easy = new EasySchema();
  3. easy.setActive(true);
  4. easy.setName("Maria Klausen");
  5. Random random = new Random();
  6. random.nextInt(1000);
  7. String randNumber = Integer.toString(random.nextInt(100000));
  8. kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
  9. }
  10. @KafkaClient
  11. public interface KafkaClientProducer {
  12. @Topic("with-easy-topic1")
  13. void sendMessage(@KafkaKey String key, EasySchema message);
  14. }

我目前是这样消费消息的:

  1. @Topic("with-easy-topic1")
  2. void receive(@KafkaKey String key, EasySchema message) {
  3. System.out.println(key);
  4. System.out.println(message);
  5. }

然后我收到以下错误消息:

  1. ERROR i.m.c.k.e.KafkaListenerExceptionHandler - 处理记录时出错 [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]],Kafka 消费者 [de.mydata.CustomerMessageConsumer@186701a2] 产生了错误: 未指定所需参数[EasySchema message]
  2. io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: 未指定所需参数[EasySchema message]

如果我使用简单的字符串而不是EasySchema,我就不会得到错误。我做错了什么?有什么建议吗?

我想直接使用EasySchema。

我的application.yml如下:

  1. micronaut:
  2. application:
  3. name: customerMessageProducer
  4. server:
  5. port: 8092
  6. netty:
  7. default:
  8. allocator:
  9. max-order: 3
  10. kafka:
  11. bootstrap:
  12. servers: localhost:9092
  13. producers:
  14. default:
  15. key:
  16. serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  17. value:
  18. serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  19. schema:
  20. registry:
  21. url: http://127.0.0.1:8081
  22. consumers:
  23. default:
  24. key:
  25. deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
  26. value:
  27. deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
  28. schema:
  29. registry:
  30. url: http://127.0.0.1:8081
英文:

I work with Micronauts, Kafka and Avro.

My Producer can send messages to Kafka without any problems. Unfortunately, my Consumer cannot read them correctly. Error is:
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified

I use the following AVRO schema from which I generate a class:

  1. {
  2. "type" : "record",
  3. "name" : "EasySchema",
  4. "namespace" : "de.mydata.kafka.easy",
  5. "fields" : [ {
  6. "name" : "active",
  7. "type" : "boolean"
  8. }, {
  9. "name" : "name",
  10. "type" : "string"
  11. }
  12. ]
  13. }

I use this to send messages without problems. The message is successfully sent to Kafka, I see in my GUI

  1. public void sendMessages() throws Exception {
  2. EasySchema easy = new EasySchema();
  3. easy.setActive(true);
  4. easy.setName("Maria Klausen");
  5. Random random = new Random();
  6. random.nextInt(1000);
  7. String randNumber = Integer.toString(random.nextInt(100000));
  8. kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
  9. }
  10. @KafkaClient
  11. public interface KafkaClientProducer {
  12. @Topic("with-easy-topic1")
  13. void sendMessage(@KafkaKey String key, EasySchema message);
  14. }

I currently consume the message like this:

  1. @Topic("with-easy-topic1")
  2. void receive(@KafkaKey String key, EasySchema message) {
  3. System.out.println(key);
  4. System.out.println(message);
  5. }

I then receive the following error message:

  1. ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]] for Kafka consumer [de.mydata.CustomerMessageConsumer@186701a2] produced error: Required argument [EasySchema message] not specified
  2. io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified

If I would use a simple string instead of EasySchema, I don't get an error. What am I doing wrong? Any ideas?

I would like to use EasySchema directly.

my application.yml looks like this:

  1. micronaut:
  2. application:
  3. name: customerMessageProducer
  4. server:
  5. port: 8092
  6. netty:
  7. default:
  8. allocator:
  9. max-order: 3
  10. kafka:
  11. bootstrap:
  12. servers: localhost:9092
  13. producers:
  14. default:
  15. key:
  16. serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  17. value:
  18. serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  19. schema:
  20. registry:
  21. url: http://127.0.0.1:8081
  22. consumers:
  23. default:
  24. key:
  25. deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
  26. value:
  27. deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
  28. schema:
  29. registry:
  30. url: http://127.0.0.1:8081

答案1

得分: 1

第一个问题似乎是一个 bug,但 Micronaut 也以这种方式进行了文档记录。或者您可以尝试添加 @MessageBody 注解。

  1. @Topic("...")
  2. void receive(ConsumerRecord<String, EasySchema> record) {
  3. String key = record.key();
  4. EasySchema value = record.value();
  5. }

其次,您需要告诉 Confluent 反序列化器使用特定的记录类型。

  1. kafka:
  2. consumers:
  3. default:
  4. specific.avro.reader: true
英文:

The first issue seems like a bug, but Micronaut has also documented this way. Or you could try adding @MessageBody annotation

  1. @Topic(&quot;...&quot;)
  2. void receive(ConsumerRecord&lt;String, EasySchema&gt; record) {
  3. String key = record.key();
  4. EasySchema value = record.value();
  5. }

Secondly, you need to tell the Confluent deserializer to use a specific record type

  1. kafka:
  2. consumers:
  3. default:
  4. specific.avro.reader: true

huangapple
  • 本文由 发表于 2023年7月17日 22:47:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76705639.html
  • apache-kafka
  • avro
  • deserialization
  • java
  • micronaut

如何将MySQL表列作为变量放入 :?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定