英文:
Do i need to register schema for kafka stream changelog topic under schema registry?
问题
我正在使用Processor API和Kafka StreamDSL实现Kafka流项目。
我的处理器中的处理函数是:
@Override
public void process(final String key, final T event) {
keyValueStore.put(key, event);
}
我的拓扑是:
protected Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder<KeyValueStore<String, T>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
new EventSerializer(streamProperties()),
new EventDeserializer(streamProperties())));
builder.addStateStore(storeBuilder);
final KStream<String, T> stream = builder.stream(inputTopic);
stream.process(() -> new Processor<> (stateStoreName), stateStoreName);
stream.to(outputTopic);
return builder.build();
}
最后,这是我的自定义EventSerializer类:
public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
implements Serializer<T> {
private final KafkaAvroSerializer inner;
public EventSerializer(Map<String, ?> properties) {
inner = new KafkaAvroSerializer();
configure(properties, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
}
@Override
public byte[] serialize(final String topic, final T record) {
return inner.serialize(topic, record);
}
}
当处理器将事件放入keyValueStore时,我遇到了错误io.confluent.rest.exceptions.RestNotFoundException: Subject not found. 经过一段时间的调试,我意识到这是因为在序列化事件时序列化器出现了问题。函数public byte[] serialize(final String topic, final T record)
中的主题是application id-store-changelog
。尽管我不知道原因,但这是Kafka的内部行为。序列化器无法找到此组合主题的模式,因此抛出错误。
我是否需要为此组合主题注册模式,或者是否有办法将已注册模式的真实消费者主题传递给序列化器?
英文:
i am implementing kafka stream project using Processor API and Kafka StreamDSL.
My process function in processor is
@Override
public void process(final String key, final T event) {
keyValueStore.put(key, event);
}
My Topology is
protected Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder<KeyValueStore<String, T>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
new EventSerializer(streamProperties()),
new EventDeserializer(streamProperties())));
builder.addStateStore(storeBuilder);
final KStream<String, T> stream = builder.stream(inputTopic);
stream.process(() -> new Processor<>(stateStoreName), stateStoreName);
stream.to(outputTopic);
return builder.build();
}
And Lastly this is my custom EventSerializer class:
public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
implements Serializer<T> {
private final KafkaAvroSerializer inner;
public EventSerializer(Map<String, ?> properties) {
inner = new KafkaAvroSerializer();
configure(properties, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
}
@Override
public byte[] serialize(final String topic, final T record) {
return inner.serialize(topic, record);
}
}
When processor puts event into keyValueStore, I hit en error io.confluent.rest.exceptions.RestNotFoundException: Subject not found. After debugging for a while, i realized that it's because of the serializer has trouble when serializing events. The topic in the function public byte[] serialize(final String topic, final T record)
is application id-store-changelog
. It's kafka inner behavior even though i don't know why. Serialzer can't find schema for this combined topic, thus throwing an error.
Do i need to register schema for this combined topic or is there any way to pass the real consumer topic into serialzer who already has schema registered?
答案1
得分: 2
当你使用 new KafkaAvroSerializer();
时,默认指向 localhost:8081
作为Schema Registry。
你不需要注册(尽管可以这样做),因为生产者在序列化逻辑中执行了这个操作,使用了 inner.serialize
。
注意:扩展 KafkaAvroSerializer
可能更有意义。
英文:
When you have new KafkaAvroSerializer();
, it defaults to point at localhost:8081
for the Schema Registry.
You don't need to register (although you can), since the Producer does it as part of the serialization logic with inner.serialize
Note: extending KafkaAvroSerializer
might make more sense
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论