英文:
Micronaut, Kafka with Avro - Consumer Error: Required argument [EasySchema message] not specified
问题
我使用Micronaut、Kafka和Avro进行工作。
我的生产者可以成功地向Kafka发送消息,但不幸的是,我的消费者无法正确读取它们。错误信息如下:
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: 未指定所需参数[EasySchema message]
我使用以下AVRO模式生成一个类:
{
"type" : "record",
"name" : "EasySchema",
"namespace" : "de.mydata.kafka.easy",
"fields" : [ {
"name" : "active",
"type" : "boolean"
}, {
"name" : "name",
"type" : "string"
}
]
}
我使用这个模式成功地发送消息。消息成功发送到Kafka,我在GUI中看到了。
public void sendMessages() throws Exception {
EasySchema easy = new EasySchema();
easy.setActive(true);
easy.setName("Maria Klausen");
Random random = new Random();
random.nextInt(1000);
String randNumber = Integer.toString(random.nextInt(100000));
kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
}
@KafkaClient
public interface KafkaClientProducer {
@Topic("with-easy-topic1")
void sendMessage(@KafkaKey String key, EasySchema message);
}
我目前是这样消费消息的:
@Topic("with-easy-topic1")
void receive(@KafkaKey String key, EasySchema message) {
System.out.println(key);
System.out.println(message);
}
然后我收到以下错误消息:
ERROR i.m.c.k.e.KafkaListenerExceptionHandler - 处理记录时出错 [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]],Kafka 消费者 [de.mydata.CustomerMessageConsumer@186701a2] 产生了错误: 未指定所需参数[EasySchema message]
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: 未指定所需参数[EasySchema message]
如果我使用简单的字符串而不是EasySchema,我就不会得到错误。我做错了什么?有什么建议吗?
我想直接使用EasySchema。
我的application.yml如下:
micronaut:
application:
name: customerMessageProducer
server:
port: 8092
netty:
default:
allocator:
max-order: 3
kafka:
bootstrap:
servers: localhost:9092
producers:
default:
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema:
registry:
url: http://127.0.0.1:8081
consumers:
default:
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema:
registry:
url: http://127.0.0.1:8081
英文:
I work with Micronauts, Kafka and Avro.
My Producer can send messages to Kafka without any problems. Unfortunately, my Consumer cannot read them correctly. Error is:
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified
I use the following AVRO schema from which I generate a class:
{
"type" : "record",
"name" : "EasySchema",
"namespace" : "de.mydata.kafka.easy",
"fields" : [ {
"name" : "active",
"type" : "boolean"
}, {
"name" : "name",
"type" : "string"
}
]
}
I use this to send messages without problems. The message is successfully sent to Kafka, I see in my GUI
public void sendMessages() throws Exception {
EasySchema easy = new EasySchema();
easy.setActive(true);
easy.setName("Maria Klausen");
Random random = new Random();
random.nextInt(1000);
String randNumber = Integer.toString(random.nextInt(100000));
kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
}
@KafkaClient
public interface KafkaClientProducer {
@Topic("with-easy-topic1")
void sendMessage(@KafkaKey String key, EasySchema message);
}
I currently consume the message like this:
@Topic("with-easy-topic1")
void receive(@KafkaKey String key, EasySchema message) {
System.out.println(key);
System.out.println(message);
}
I then receive the following error message:
ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]] for Kafka consumer [de.mydata.CustomerMessageConsumer@186701a2] produced error: Required argument [EasySchema message] not specified
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified
If I would use a simple string instead of EasySchema, I don't get an error. What am I doing wrong? Any ideas?
I would like to use EasySchema directly.
my application.yml looks like this:
micronaut:
application:
name: customerMessageProducer
server:
port: 8092
netty:
default:
allocator:
max-order: 3
kafka:
bootstrap:
servers: localhost:9092
producers:
default:
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema:
registry:
url: http://127.0.0.1:8081
consumers:
default:
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema:
registry:
url: http://127.0.0.1:8081
答案1
得分: 1
第一个问题似乎是一个 bug,但 Micronaut 也以这种方式进行了文档记录。或者您可以尝试添加 @MessageBody
注解。
@Topic("...")
void receive(ConsumerRecord<String, EasySchema> record) {
String key = record.key();
EasySchema value = record.value();
}
其次,您需要告诉 Confluent 反序列化器使用特定的记录类型。
kafka:
consumers:
default:
specific.avro.reader: true
英文:
The first issue seems like a bug, but Micronaut has also documented this way. Or you could try adding @MessageBody
annotation
@Topic("...")
void receive(ConsumerRecord<String, EasySchema> record) {
String key = record.key();
EasySchema value = record.value();
}
Secondly, you need to tell the Confluent deserializer to use a specific record type
kafka:
consumers:
default:
specific.avro.reader: true
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论