需要在模式注册表下为Kafka流的changelog主题注册模式吗?

huangapple go评论114阅读模式
英文:

Do i need to register schema for kafka stream changelog topic under schema registry?

问题

我正在使用Processor API和Kafka StreamDSL实现Kafka流项目。
我的处理器中的处理函数是:

  1. @Override
  2. public void process(final String key, final T event) {
  3. keyValueStore.put(key, event);
  4. }

我的拓扑是:

  1. protected Topology buildTopology() {
  2. final StreamsBuilder builder = new StreamsBuilder();
  3. KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
  4. StoreBuilder<KeyValueStore<String, T>> storeBuilder =
  5. Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
  6. new EventSerializer(streamProperties()),
  7. new EventDeserializer(streamProperties())));
  8. builder.addStateStore(storeBuilder);
  9. final KStream<String, T> stream = builder.stream(inputTopic);
  10. stream.process(() -> new Processor<> (stateStoreName), stateStoreName);
  11. stream.to(outputTopic);
  12. return builder.build();
  13. }

最后,这是我的自定义EventSerializer类:

  1. public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
  2. implements Serializer<T> {
  3. private final KafkaAvroSerializer inner;
  4. public EventSerializer(Map<String, ?> properties) {
  5. inner = new KafkaAvroSerializer();
  6. configure(properties, false);
  7. }
  8. @Override
  9. public void configure(Map<String, ?> configs, boolean isKey) {
  10. inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
  11. }
  12. @Override
  13. public byte[] serialize(final String topic, final T record) {
  14. return inner.serialize(topic, record);
  15. }
  16. }

当处理器将事件放入keyValueStore时,我遇到了错误io.confluent.rest.exceptions.RestNotFoundException: Subject not found. 经过一段时间的调试,我意识到这是因为在序列化事件时序列化器出现了问题。函数public byte[] serialize(final String topic, final T record) 中的主题是application id-store-changelog。尽管我不知道原因,但这是Kafka的内部行为。序列化器无法找到此组合主题的模式,因此抛出错误。
我是否需要为此组合主题注册模式,或者是否有办法将已注册模式的真实消费者主题传递给序列化器?

英文:

i am implementing kafka stream project using Processor API and Kafka StreamDSL.
My process function in processor is

  1. @Override
  2. public void process(final String key, final T event) {
  3. keyValueStore.put(key, event);
  4. }

My Topology is

  1. protected Topology buildTopology() {
  2. final StreamsBuilder builder = new StreamsBuilder();
  3. KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
  4. StoreBuilder&lt;KeyValueStore&lt;String, T&gt;&gt; storeBuilder =
  5. Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
  6. new EventSerializer(streamProperties()),
  7. new EventDeserializer(streamProperties())));
  8. builder.addStateStore(storeBuilder);
  9. final KStream&lt;String, T&gt; stream = builder.stream(inputTopic);
  10. stream.process(() -&gt; new Processor&lt;&gt;(stateStoreName), stateStoreName);
  11. stream.to(outputTopic);
  12. return builder.build();
  13. }

And Lastly this is my custom EventSerializer class:

  1. public class EventSerializer&lt;T extends SpecificRecordBase &amp; SpecificRecord&gt;
  2. implements Serializer&lt;T&gt; {
  3. private final KafkaAvroSerializer inner;
  4. public EventSerializer(Map&lt;String, ?&gt; properties) {
  5. inner = new KafkaAvroSerializer();
  6. configure(properties, false);
  7. }
  8. @Override
  9. public void configure(Map&lt;String, ?&gt; configs, boolean isKey) {
  10. inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
  11. }
  12. @Override
  13. public byte[] serialize(final String topic, final T record) {
  14. return inner.serialize(topic, record);
  15. }
  16. }

When processor puts event into keyValueStore, I hit en error io.confluent.rest.exceptions.RestNotFoundException: Subject not found. After debugging for a while, i realized that it's because of the serializer has trouble when serializing events. The topic in the function public byte[] serialize(final String topic, final T record) is application id-store-changelog. It's kafka inner behavior even though i don't know why. Serialzer can't find schema for this combined topic, thus throwing an error.
Do i need to register schema for this combined topic or is there any way to pass the real consumer topic into serialzer who already has schema registered?

答案1

得分: 2

当你使用 new KafkaAvroSerializer(); 时,默认指向 localhost:8081 作为Schema Registry。

你不需要注册(尽管可以这样做),因为生产者在序列化逻辑中执行了这个操作,使用了 inner.serialize

注意:扩展 KafkaAvroSerializer 可能更有意义。

英文:

When you have new KafkaAvroSerializer(); , it defaults to point at localhost:8081 for the Schema Registry.

You don't need to register (although you can), since the Producer does it as part of the serialization logic with inner.serialize

Note: extending KafkaAvroSerializer might make more sense

huangapple
  • 本文由 发表于 2020年8月5日 15:14:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/63260163.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定