Micronaut,Kafka与Avro – 消费者错误:未指定所需参数[EasySchema消息]。

huangapple go评论77阅读模式
英文:

Micronaut, Kafka with Avro - Consumer Error: Required argument [EasySchema message] not specified

问题

我使用Micronaut、Kafka和Avro进行工作。

我的生产者可以成功地向Kafka发送消息,但不幸的是,我的消费者无法正确读取它们。错误信息如下:
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: 未指定所需参数[EasySchema message]

我使用以下AVRO模式生成一个类:

{
  "type" : "record",
  "name" : "EasySchema",
  "namespace" : "de.mydata.kafka.easy",
  "fields" : [ {
    "name" : "active",
    "type" : "boolean"
    }, {
      "name" : "name",
      "type" : "string"
    }
   ]
}

我使用这个模式成功地发送消息。消息成功发送到Kafka,我在GUI中看到了。

public void sendMessages() throws Exception {
    EasySchema easy = new EasySchema();
    easy.setActive(true);
    easy.setName("Maria Klausen");
    Random random = new Random();
    random.nextInt(1000);
    String randNumber = Integer.toString(random.nextInt(100000));
    kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
}

@KafkaClient
public interface KafkaClientProducer {
    @Topic("with-easy-topic1")
    void sendMessage(@KafkaKey String key, EasySchema message);
}

我目前是这样消费消息的:

@Topic("with-easy-topic1")
void receive(@KafkaKey String key, EasySchema message) {
    System.out.println(key);
    System.out.println(message);
}

然后我收到以下错误消息:

ERROR i.m.c.k.e.KafkaListenerExceptionHandler - 处理记录时出错 [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]],Kafka 消费者 [de.mydata.CustomerMessageConsumer@186701a2] 产生了错误: 未指定所需参数[EasySchema message]
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: 未指定所需参数[EasySchema message]

如果我使用简单的字符串而不是EasySchema,我就不会得到错误。我做错了什么?有什么建议吗?

我想直接使用EasySchema。

我的application.yml如下:

micronaut:
  application:
    name: customerMessageProducer
  server:
    port: 8092
netty:
  default:
    allocator:
      max-order: 3
kafka:
  bootstrap:
    servers: localhost:9092
  producers:
    default:
      key:
        serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value:
        serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      schema:
        registry:
          url: http://127.0.0.1:8081
  consumers:
    default:
      key:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      value:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      schema:
        registry:
          url: http://127.0.0.1:8081
英文:

I work with Micronauts, Kafka and Avro.

My Producer can send messages to Kafka without any problems. Unfortunately, my Consumer cannot read them correctly. Error is:
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified

I use the following AVRO schema from which I generate a class:

{
  "type" : "record",
  "name" : "EasySchema",
  "namespace" : "de.mydata.kafka.easy",
  "fields" : [ {
    "name" : "active",
    "type" : "boolean"
    }, {
      "name" : "name",
      "type" : "string"
    }
   ]
}

I use this to send messages without problems. The message is successfully sent to Kafka, I see in my GUI

    public void sendMessages() throws Exception {
        EasySchema easy = new EasySchema();
        easy.setActive(true);
        easy.setName("Maria Klausen");
        Random random = new Random();
        random.nextInt(1000);
        String randNumber = Integer.toString(random.nextInt(100000));
        kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
    }

    @KafkaClient
    public interface KafkaClientProducer {
        @Topic("with-easy-topic1")
        void sendMessage(@KafkaKey String key, EasySchema message);
    }

I currently consume the message like this:

    @Topic("with-easy-topic1")
    void receive(@KafkaKey String key, EasySchema message) {
        System.out.println(key);
        System.out.println(message);
    }

I then receive the following error message:

ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]] for Kafka consumer [de.mydata.CustomerMessageConsumer@186701a2] produced error: Required argument [EasySchema message] not specified
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified

If I would use a simple string instead of EasySchema, I don't get an error. What am I doing wrong? Any ideas?

I would like to use EasySchema directly.

my application.yml looks like this:

micronaut:
  application:
    name: customerMessageProducer
  server:
    port: 8092
netty:
  default:
    allocator:
      max-order: 3
kafka:
  bootstrap:
    servers: localhost:9092
  producers:
    default:
      key:
        serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value:
        serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      schema:
        registry:
          url: http://127.0.0.1:8081
  consumers:
    default:
      key:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      value:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      schema:
        registry:
          url: http://127.0.0.1:8081

答案1

得分: 1

第一个问题似乎是一个 bug,但 Micronaut 也以这种方式进行了文档记录。或者您可以尝试添加 @MessageBody 注解。

@Topic("...")
void receive(ConsumerRecord<String, EasySchema> record) {
  String key = record.key();
  EasySchema value = record.value();
}

其次,您需要告诉 Confluent 反序列化器使用特定的记录类型。

kafka:
    consumers:
        default:
            specific.avro.reader: true
英文:

The first issue seems like a bug, but Micronaut has also documented this way. Or you could try adding @MessageBody annotation

@Topic(&quot;...&quot;)
void receive(ConsumerRecord&lt;String, EasySchema&gt; record) {
  String key = record.key();
  EasySchema value = record.value();
}

Secondly, you need to tell the Confluent deserializer to use a specific record type

kafka:
    consumers:
        default:
            specific.avro.reader: true

huangapple
  • 本文由 发表于 2023年7月17日 22:47:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76705639.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定