英文:
How to write protobuf byte array from Flink to Kafka
问题
我对Flink还不熟悉。我想要做的就是将我的protobuf POJO作为字节数组放入kafka中。因此,我的FlinkKafkaProducer
看起来像这样:
FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream
.map(//这里返回字节数组[])
.addSink(flinkKafkaProducer);
public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
}
目前它能正常工作,但我的输出是字符串。我尝试过将new SimpleStringSchema()
更改为 TypeInformationSerializationSchema()
以改变输出,但我不知道如何正确调整它。找不到任何教程。有人能帮忙吗?
英文:
I'm new to Flink. All I want to do is put my protobuf POJO to kafka as byte array. So my FlinkKafkaProducer
looks like this:
FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream
.map(//here returns byte[])
.addSink(flinkKafkaProducer);
public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
}
And right now it works fine but my output is String. I've tryed to add TypeInformationSerializationSchema()
instead of new SimpleStringSchema()
to change output but I cant get how to adjust it correct. Cant find any tutorial. Could someone help?
答案1
得分: 2
因此,我终于找出了如何将 Protocol Buffer 写入 Kafka 生产者作为字节数组的方法。问题出在序列化上。在 POJO 的情况下,Flink 使用库 Kryo
进行自定义的序列化和反序列化。写入 Protocol Buffer 的最佳方法是使用 ProtobufSerializer.class
。在这个示例中,我将从 Kafka 中读取字符串消息并将其写入为字节数组。
Gradle 依赖项:
compile(group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6') {
exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
}
implementation 'com.google.protobuf:protobuf-java:3.11.0'
注册:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().registerTypeWithKryoSerializer(MyProtobuf.class, ProtobufSerializer.class);
KafkaSerializerClass
@Data
@RequiredArgsConstructor
public class MyProtoKafkaSerializer implements KafkaSerializationSchema<MyProto> {
private final String topic;
private final byte[] key;
@Override
public ProducerRecord<byte[], byte[]> serialize(MyProto element, Long timestamp) {
return new ProducerRecord<>(topic, key, element.toByteArray());
}
}
作业
public static FlinkKafkaProducer<MyProto> createProtoProducer(String topic, String kafkaAddress) {
MyProtoKafkaSerializer myProtoKafkaSerializer = new MyProtoKafkaSerializer(topic);
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", consumerGroup);
return new FlinkKafkaProducer<>(topic, myProtoKafkaSerializer, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
public static FlinkKafkaConsumer<String> createProtoConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
}
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer<MyProto> flinkKafkaProducer = createProtoProducer(outputTopic, address);
stringInputStream
.map(hashtagMapFunction)
.addSink(flinkKafkaProducer);
environment.execute("My test job");
来源:
英文:
So, I finally figure out how to write protobuf to kafka producer as byte array. The problem was with serialization. In case of POJO flink uses libery Kryo
for custom de/serialization. Best way to write protobuf is use ProtobufSerializer.class
. In this example I will read from kafka String message and write as byte array.
Gradle dependencys:
compile (group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'){
exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
}
implementation 'com.google.protobuf:protobuf-java:3.11.0'
Registration:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().registerTypeWithKryoSerializer(MyProtobuf.class, ProtobufSerializer.class);
KafkaSerializerClass
@Data
@RequiredArgsConstructor
public class MyProtoKafkaSerializer implements KafkaSerializationSchema<MyProto> {
private final String topic;
private final byte[] key;
@Override
public ProducerRecord<byte[], byte[]> serialize(MyProto element, Long timestamp) {
return new ProducerRecord<>(topic, key, element.toByteArray());
}
}
Job
public static FlinkKafkaProducer<MyProto> createProtoProducer(String topic, String kafkaAddress) {
MyProtoKafkaSerializer myProtoKafkaSerializer = new MyProtoKafkaSerializer(topic);
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", consumerGroup);
return new FlinkKafkaProducer<>(topic, myProtoKafkaSerializer, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
public static FlinkKafkaConsumer<String> createProtoConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
}
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer<MyProto> flinkKafkaProducer = createProtoProducer(outputTopic, address);
stringInputStream
.map(hashtagMapFunction)
.addSink(flinkKafkaProducer);
environment.execute("My test job");
Sources:
答案2
得分: 0
确实似乎在这个问题上找到文档有些棘手。我会假设您使用的是 Flink 版本大于等于 1.9。在这种情况下,以下代码应该有效:
private static class PojoKafkaSerializationSchema implements KafkaSerializationSchema<YourPojo> {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {}
@Override
public ProducerRecord<byte[], byte[]> serialize(YourPojo element,@Nullable Long timestamp) {
// 在此处对您的 POJO 进行序列化,并返回 Kafka 的 ProducerRecord
return null;
}
}
// 其他地方:
PojoKafkaSerializationSchema schema = new PojoKafkaSerializationSchema();
FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
"test-topic",
schema,
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
);
此代码主要受到此测试用例的启发,但我没有时间实际运行它。
英文:
It indeed seems tricky to find documentation in this matter. I'll assume you use Flink >= 1.9. In that case, the following should work:
private static class PojoKafkaSerializationSchema implements KafkaSerializationSchema<YourPojo> {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {}
@Override
public ProducerRecord<byte[], byte[]> serialize(YourPojo element,@Nullable Long timestamp) {
// serialize your POJO here and return a Kafka `ProducerRecord`
return null;
}
}
// Elsewhere:
PojoKafkaSerializationSchema schema = new PojoKafkaSerializationSchema();
FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
"test-topic",
schema,
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
);
This code is mostly inspired by this test case, but I didn't have time to actually run it.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论