英文:
Unable to use schemaRegistry in reactor kafka
问题
我正在尝试使用Reactor Kafka设置Kafka消费者。生产者已经与Kafka模式注册表集成。
我遇到了一个异常,错误信息为"Protocol message contained an invalid tag (zero)"。在我的单元测试中(没有使用模式注册表)可以解析成功。
看起来模式注册表没有被使用。我在这里做错了什么?
反序列化器的代码如下:
@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
public MyProtoDeserializer() {}
/**
* Deserializes the data to MyProto from byte array.
*
* @param topic
* @param data
* @return
*/
@Override
public MyProto deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
// TODO: Use schema registry and kpow
try {
return MyProto.getDefaultInstance()
.getParserForType()
.parseFrom(data);
} catch (Exception ex) {
log.debug("Exception in MyProto parse {}", ex.getMessage());
return MyProto.getDefaultInstance();
}
}
}
请注意,我只翻译了你提供的代码部分,其他内容不做翻译。
英文:
I am tying to setup kafka consumer using reactor kafka . Producer is integrated with kafka schema registry
@Value("${spring.kafka.schemaRegistryUrls}")
private String schemaRegistryEnvVarValue;
@Bean
public ReceiverOptions<String, MyProto> kafkaReceiverOptionsFloor(
KafkaProperties kafkaProperties) {
final Map<String, Object> kafkaConsumerProperties =
kafkaProperties.buildConsumerProperties();
for (Map.Entry<String, KafkaProperties.Consumer> entry :
kafkaConsumerPropertiesConfig.getConsumer().entrySet()) {
if (kafkaTopics.contains(entry.getKey())) {
kafkaConsumerProperties.putAll(entry.getValue().buildProperties());
}
}
kafkaConsumerProperties.put("schema.registry.url", schemaRegistryEnvVarValue);
final ReceiverOptions<String, MyProto> basicReceiverOptions =
ReceiverOptions.<String, MyProto>create(
kafkaConsumerProperties)
.withValueDeserializer(new MyProtoDeserializer())
// disabling auto commit, since we are managing committing once
// record is
// processed
.commitInterval(Duration.ZERO)
.commitBatchSize(0);
kafkaConsumerProperties.forEach((k, v) -> log.debug("k2 {} v2 {}", k, v));
return basicReceiverOptions
.subscription(kafkaTopicsFloor)
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, MyProto>
reactiveKafkaConsumerTemplate(
ReceiverOptions<String, MyProto>
kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}
I am getting exception as Protocol message contained an invalid tag (zero).
Its able to parse in my Unit tests (without schema registry)
Looks like schemaregistry is not being used . what am i doing wrong here .
Deserializer looks like below
@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
public MyProtoDeserializer() {}
/**
* Deserializes the data to my_proto from byte array.
*
* @param topic
* @param data
* @return
*/
@Override
public MyProto deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
// TODO: Use schemaregistry and kpow
try {
return MyProto.getDefaultInstance()
.getParserForType()
.parseFrom(data);
} catch (Exception ex) {
log.debug("Exception in MyProto parse {}", ex.getMessage());
return MyProto.getDefaultInstance();
}
}
}
答案1
得分: 1
反应堆不是问题。
schema.registry.url
只是Confluent Deserializer类的一个属性。您没有在Deserializer中实现configure
函数,因此忽略了该属性。类似地,直接调用parseFrom
没有使用任何HTTP客户端与注册表进行交互。
导入该库,而不是编写自己的代码。
https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer/7.4.0
此外,以下是如何使用该属性自动配置Spring Boot的示例:
spring:
kafka:
consumer:
value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
properties:
"[schema.registry.url]": http://...
参考文档:https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties
英文:
Reactor isn't the issue.
schema.registry.url
is only a property of the Confluent Deserializer class. You are not implementing configure
function in the Deserializer, therefore you are ignoring that property. Similarly, directly calling parseFrom
isn't using any HTTP client to interact with a Registry.
Import the library, rather than write your own
https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer/7.4.0
Also, this is how to auto configure Spring Boot with that property
spring:
kafka:
consumer:
value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
properties:
"[schema.registry.url]": http://...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论