无法在反应式 Kafka 中使用 schemaRegistry。

huangapple go评论91阅读模式
英文:

Unable to use schemaRegistry in reactor kafka

问题

我正在尝试使用Reactor Kafka设置Kafka消费者。生产者已经与Kafka模式注册表集成。

我遇到了一个异常,错误信息为"Protocol message contained an invalid tag (zero)"。在我的单元测试中(没有使用模式注册表)可以解析成功。

看起来模式注册表没有被使用。我在这里做错了什么?

反序列化器的代码如下:

@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
    public MyProtoDeserializer() {}

    /**
     * Deserializes the data to MyProto from byte array.
     *
     * @param topic
     * @param data
     * @return
     */
    @Override
    public MyProto deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }
        // TODO: Use schema registry and kpow
      
        try {
            return MyProto.getDefaultInstance()
                    .getParserForType()
                    .parseFrom(data);
        } catch (Exception ex) {
            log.debug("Exception in MyProto parse {}", ex.getMessage());
            return MyProto.getDefaultInstance();
        }
    }
}

请注意,我只翻译了你提供的代码部分,其他内容不做翻译。

英文:

I am tying to setup kafka consumer using reactor kafka . Producer is integrated with kafka schema registry



    @Value(&quot;${spring.kafka.schemaRegistryUrls}&quot;)
    private String schemaRegistryEnvVarValue;


 @Bean
    public ReceiverOptions&lt;String, MyProto&gt; kafkaReceiverOptionsFloor(
            KafkaProperties kafkaProperties) {

        final Map&lt;String, Object&gt; kafkaConsumerProperties =
                kafkaProperties.buildConsumerProperties();
        for (Map.Entry&lt;String, KafkaProperties.Consumer&gt; entry :
                kafkaConsumerPropertiesConfig.getConsumer().entrySet()) {
            if (kafkaTopics.contains(entry.getKey())) {
                kafkaConsumerProperties.putAll(entry.getValue().buildProperties());
            }
        }
        kafkaConsumerProperties.put(&quot;schema.registry.url&quot;, schemaRegistryEnvVarValue);
        final ReceiverOptions&lt;String, MyProto&gt; basicReceiverOptions =
                ReceiverOptions.&lt;String, MyProto&gt;create(
                                kafkaConsumerProperties)
                        .withValueDeserializer(new MyProtoDeserializer())
                        // disabling auto commit, since we are managing committing once
                        // record is
                        // processed
                        .commitInterval(Duration.ZERO)
                        .commitBatchSize(0);

        kafkaConsumerProperties.forEach((k, v) -&gt; log.debug(&quot;k2 {} v2 {}&quot;, k, v));

        return basicReceiverOptions
                .subscription(kafkaTopicsFloor)
                .addAssignListener(partitions -&gt; log.debug(&quot;onPartitionsAssigned {}&quot;, partitions))
                .addRevokeListener(partitions -&gt; log.debug(&quot;onPartitionsRevoked {}&quot;, partitions));
    }


  @Bean
    public ReactiveKafkaConsumerTemplate&lt;String, MyProto&gt;
            reactiveKafkaConsumerTemplate(
                    ReceiverOptions&lt;String, MyProto&gt;
                            kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate&lt;&gt;(kafkaReceiverOptions);
    }

I am getting exception as Protocol message contained an invalid tag (zero).
Its able to parse in my Unit tests (without schema registry)

Looks like schemaregistry is not being used . what am i doing wrong here .

Deserializer looks like below

@Slf4j
public class MyProtoDeserializer implements Deserializer&lt;MyProto&gt; {
    public MyProtoDeserializer() {}

    /**
     * Deserializes the data to my_proto from byte array.
     *
     * @param topic
     * @param data
     * @return
     */
    @Override
    public MyProto deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }
        // TODO: Use schemaregistry and kpow
      
        try {
            return MyProto.getDefaultInstance()
                    .getParserForType()
                    .parseFrom(data);
        } catch (Exception ex) {
            log.debug(&quot;Exception in MyProto parse {}&quot;, ex.getMessage());
            return MyProto.getDefaultInstance();
        }
    }
}

答案1

得分: 1

反应堆不是问题。

schema.registry.url只是Confluent Deserializer类的一个属性。您没有在Deserializer中实现configure函数,因此忽略了该属性。类似地,直接调用parseFrom没有使用任何HTTP客户端与注册表进行交互。

导入该库,而不是编写自己的代码。

https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer/7.4.0

此外,以下是如何使用该属性自动配置Spring Boot的示例:

spring:
  kafka:
    consumer:
      value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
      properties:
        "[schema.registry.url]": http://...

参考文档:https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

英文:

Reactor isn't the issue.

schema.registry.url is only a property of the Confluent Deserializer class. You are not implementing configure function in the Deserializer, therefore you are ignoring that property. Similarly, directly calling parseFrom isn't using any HTTP client to interact with a Registry.

Import the library, rather than write your own

https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer/7.4.0

Also, this is how to auto configure Spring Boot with that property

spring:
  kafka:
    consumer:
      value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
      properties:
        &quot;[schema.registry.url]&quot;: http://...

ref https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

huangapple
  • 本文由 发表于 2023年8月9日 01:06:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76861776.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定