如何在使用Spring Cloud Stream中的PollableMessageSource输入时使用Avro原生解码器?

huangapple go评论141阅读模式
英文:

How to use avro native decoder when using a PollableMessageSource input in spring cloud stream?

问题

我正在使用PollableMessageSource输入从Kafka主题中读取消息。该主题上的消息采用Avro格式。在发布这些消息时,将use-native-decoding设置为true。

以下是我的轮询方式:

pollableChannels.inputChannel().poll(this::processorMethodName,
        new ParameterizedTypeReference<TypeClassName>() {
        });

pollableChannels只是这个接口的一个注入实例:

public interface PollableChannels {
  @Input("input-channel")
  PollableMessageSource inputChannel();
}

在发现TypeClassName未正确形成(其嵌套对象错误地设置为null)之后,我开始调试poll方法,发现它依赖于contentType标头来选择转换器,由于这未被设置(因为消息已经以本机编码),它回退到使用ApplicationJsonMessageMarshallingConverter,这显然不是正确的选项。

如果我使用常规的streamListeneruse-native-decoding配置属性就会被很好地执行,因此消息似乎被成功发布。

因此,我这里的主要问题是如何在使用可轮询的消费者时强制进行本机解码?我的更广泛的问题可能是,在使用可轮询的消费者时,spring.cloud.stream.bindings.channel-name.consumer下的属性是否被完全尊重?

Spring Cloud Stream版本:2.2.0.RELEASE
Spring Kafka版本:2.2.5.RELEASE
序列化程序的Confluent版本:5.2.1

更新:

相关配置:

spring:
  cloud.stream:
    bindings:
      input-channel:
        content-type: application/*+avro
        destination: "topic-name"
        group: "group-name"
        consumer:
          partitioned: true
          concurrency: 3
          max-attempts: 1
          use-native-decoding: true
    kafka:
      binder:
        configuration:
          key.serializer: org.apache.kafka.common.serialization.StringSerializer
          value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
英文:

I'm using a PollableMessageSource input to read from a Kafka topic. Messages on that topic are in Avro. use-native-decoding was set to true when those messages were published.

This is how I'm polling:

pollableChannels.inputChannel().poll(this::processorMethodName,
        new ParameterizedTypeReference&lt;TypeClassName&gt;() {
        });

pollableChannels is just an injected instance of this interface:

public interface PollableChannels {
  @Input(&quot;input-channel&quot;)
  PollableMessageSource inputChannel();
}

After seeing that the TypeClassName is not being formed properly (it's nested objects are set to null by mistake), I started debugging the poll method and I found that it's relying on the contentType header to select a converter, and since this has not been set (because the messages have been encoded natively), it's falling back to using the ApplicationJsonMessageMarshallingConverter which is clearly not the right option.

If I use a regular streamListener, the use-native-decoding config property is honored fine, so the messages seem to be published ok.

Therefore, my primary question here is how to force native decoding when using pollable consumers?
My borader question could be asking if properties under spring.cloud.stream.bindings.channel-name.consumer are respected at all when using a pollable consumer?

Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1

Update:

Relevant config:

spring:
  cloud.stream:
    bindings:  
      input-channel:
        content-type: application/*+avro
        destination: &quot;topic-name&quot;
        group: &quot;group-name&quot;
        consumer:
          partitioned: true
          concurrency: 3
          max-attempts: 1
          use-native-decoding: true
    kafka:
      binder:
        configuration:
          key.serializer: org.apache.kafka.common.serialization.StringSerializer
          value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

答案1

得分: 2

ParameterzedTypeReference参数旨在帮助消息转换器将有效载荷转换为所需类型。在使用本机解码时,“转换”由反序列化器执行,因此不需要进行转换。

因此,只需从poll()方法中删除第二个参数,即可跳过转换。

话虽如此,从版本3.0.8开始(以及Spring Framework 5.2.9),转换不会产生任何操作,可以从下面的示例中看到。

然而,为了避免尝试转换,仍然更有效率地省略该参数。

else if (targetClass.isInstance(payload)) {
    return payload;
}

我刚刚测试了它,没有任何问题(在3.0.8上进行了测试,但我认为在这个领域没有进行任何更改)。实际上,甚至对于这种情况,您都不需要useNativeDecoding

public class Foo {

    private String bar;

    public Foo() {
    }

    public Foo(String bar) {
        this.bar = bar;
    }

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}

@SpringBootApplication
@EnableBinding(Polled.class)
public class So64554618Application {

    public static void main(String[] args) {
        SpringApplication.run(So64554618Application.class, args);
    }

    @Autowired
    PollableMessageSource source;

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input", "{\"bar\":\"baz\"}".getBytes());
            Thread.sleep(5_000);
            source.poll(msg -> {
                System.out.println(msg);
            }, new ParameterizedTypeReference<Foo>() { });
        };
    }

}

interface Polled {

    @Input
    PollableMessageSource input();

}
#spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.group=foo
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo

GenericMessage [payload=Foo [bar=baz], headers={kafka_offset=2, ...

英文:

The ParameterzedTypeReference. argument is intended to help the message converter to convert the payload to the required type. When using native decoding, the "conversion" is done by the deserializer instead and conversion is not necessary.

So, just remove the second argument to the poll() method and conversion will be skipped.

That said, starting with version 3.0.8 (and Spring Framework 5.2.9), the conversion is a no-op, as can be seen in the example below.

However, it's still more efficient to omit the argument to avoid any attempt at conversion.

else if (targetClass.isInstance(payload)) {
	return payload;
}

I just tested it without any problems (tested on 3.0.8, but I don't believe there have been any changes in this area). In fact, you don't even need useNativeDecoding for this case.

public class Foo {

	private String bar;

	public Foo() {
	}

	public Foo(String bar) {
		this.bar = bar;
	}

	public String getBar() {
		return this.bar;
	}

	public void setBar(String bar) {
		this.bar = bar;
	}

	@Override
	public String toString() {
		return &quot;Foo [bar=&quot; + this.bar + &quot;]&quot;;
	}

}


@SpringBootApplication
@EnableBinding(Polled.class)
public class So64554618Application {

	public static void main(String[] args) {
		SpringApplication.run(So64554618Application.class, args);
	}

	@Autowired
	PollableMessageSource source;


	@Bean
	public ApplicationRunner runner(KafkaTemplate&lt;byte[], byte[]&gt; template) {
		return args -&gt; {
			template.send(&quot;input&quot;, &quot;{\&quot;bar\&quot;:\&quot;baz\&quot;}&quot;.getBytes());
			Thread.sleep(5_000);
			source.poll(msg -&gt; {
				System.out.println(msg);
			}, new ParameterizedTypeReference&lt;Foo&gt;() { });
		};
	}

}

interface Polled {

	@Input
	PollableMessageSource input();

}
#spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.group=foo
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
GenericMessage [payload=Foo [bar=baz], headers={kafka_offset=2, ...

huangapple
  • 本文由 发表于 2020年10月27日 20:38:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/64554618.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定