如何将来自Flink的Protobuf字节数组写入Kafka

huangapple go评论71阅读模式
英文:

How to write protobuf byte array from Flink to Kafka

问题

我对Flink还不熟悉。我想要做的就是将我的protobuf POJO作为字节数组放入kafka中。因此,我的FlinkKafkaProducer看起来像这样:

FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream
        .map(//这里返回字节数组[])
        .addSink(flinkKafkaProducer);

public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
    return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
}

目前它能正常工作,但我的输出是字符串。我尝试过将new SimpleStringSchema() 更改为 TypeInformationSerializationSchema() 以改变输出,但我不知道如何正确调整它。找不到任何教程。有人能帮忙吗?

英文:

I'm new to Flink. All I want to do is put my protobuf POJO to kafka as byte array. So my FlinkKafkaProducer looks like this:

FlinkKafkaProducer&lt;String&gt; flinkKafkaProducer = createStringProducer(outputTopic, address);
        stringInputStream
                .map(//here returns byte[])
                .addSink(flinkKafkaProducer);

public static FlinkKafkaProducer&lt;String&gt; createStringProducer(String topic, String kafkaAddress) {
        return new FlinkKafkaProducer&lt;&gt;(kafkaAddress, topic, new SimpleStringSchema());
    }

And right now it works fine but my output is String. I've tryed to add TypeInformationSerializationSchema() instead of new SimpleStringSchema() to change output but I cant get how to adjust it correct. Cant find any tutorial. Could someone help?

答案1

得分: 2

因此,我终于找出了如何将 Protocol Buffer 写入 Kafka 生产者作为字节数组的方法。问题出在序列化上。在 POJO 的情况下,Flink 使用库 Kryo 进行自定义的序列化和反序列化。写入 Protocol Buffer 的最佳方法是使用 ProtobufSerializer.class。在这个示例中,我将从 Kafka 中读取字符串消息并将其写入为字节数组。

Gradle 依赖项:

compile(group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6') {
    exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
}
implementation 'com.google.protobuf:protobuf-java:3.11.0'

注册:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().registerTypeWithKryoSerializer(MyProtobuf.class, ProtobufSerializer.class);

KafkaSerializerClass

@Data
@RequiredArgsConstructor
public class MyProtoKafkaSerializer implements KafkaSerializationSchema<MyProto> {
    private final String topic;
    private final byte[] key;

    @Override
    public ProducerRecord<byte[], byte[]> serialize(MyProto element, Long timestamp) {
        return new ProducerRecord<>(topic, key, element.toByteArray());
    }
}

作业

public static FlinkKafkaProducer<MyProto> createProtoProducer(String topic, String kafkaAddress) {
    MyProtoKafkaSerializer myProtoKafkaSerializer = new MyProtoKafkaSerializer(topic);
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", consumerGroup);
    return new FlinkKafkaProducer<>(topic, myProtoKafkaSerializer, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

public static FlinkKafkaConsumer<String> createProtoConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
}

DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer<MyProto> flinkKafkaProducer = createProtoProducer(outputTopic, address);
stringInputStream
        .map(hashtagMapFunction)
        .addSink(flinkKafkaProducer);

environment.execute("My test job");

来源:

  1. Apache Flink - Custom Serializers
  2. Apache Flink - Serialization Tuning
英文:

So, I finally figure out how to write protobuf to kafka producer as byte array. The problem was with serialization. In case of POJO flink uses libery Kryo for custom de/serialization. Best way to write protobuf is use ProtobufSerializer.class. In this example I will read from kafka String message and write as byte array.
Gradle dependencys:

 compile (group: &#39;com.twitter&#39;, name: &#39;chill-protobuf&#39;, version: &#39;0.7.6&#39;){
        exclude group: &#39;com.esotericsoftware.kryo&#39;, module: &#39;kryo&#39;
    }
    implementation &#39;com.google.protobuf:protobuf-java:3.11.0&#39;

Registration:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().registerTypeWithKryoSerializer(MyProtobuf.class, ProtobufSerializer.class);

KafkaSerializerClass

@Data
@RequiredArgsConstructor
public class MyProtoKafkaSerializer implements KafkaSerializationSchema&lt;MyProto&gt; {
    private final String topic;
    private final byte[] key;

    @Override
    public ProducerRecord&lt;byte[], byte[]&gt; serialize(MyProto element, Long timestamp) {
                
        return new ProducerRecord&lt;&gt;(topic, key, element.toByteArray());
    }
}

Job

  public static FlinkKafkaProducer&lt;MyProto&gt; createProtoProducer(String topic, String kafkaAddress) {
        MyProtoKafkaSerializer myProtoKafkaSerializer = new MyProtoKafkaSerializer(topic);
        Properties props = new Properties();
        props.setProperty(&quot;bootstrap.servers&quot;, kafkaAddress);
        props.setProperty(&quot;group.id&quot;, consumerGroup);
        return new FlinkKafkaProducer&lt;&gt;(topic, myProtoKafkaSerializer, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
    }

 public static FlinkKafkaConsumer&lt;String&gt; createProtoConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
        Properties props = new Properties();
        props.setProperty(&quot;bootstrap.servers&quot;, kafkaAddress);
        props.setProperty(&quot;group.id&quot;, kafkaGroup);
        return new FlinkKafkaConsumer&lt;&gt;(topic, new SimpleStringSchema(), props);
    }

DataStream&lt;String&gt; stringInputStream = environment.addSource(flinkKafkaConsumer);
        FlinkKafkaProducer&lt;MyProto&gt; flinkKafkaProducer = createProtoProducer(outputTopic, address);
        stringInputStream
                .map(hashtagMapFunction)
                .addSink(flinkKafkaProducer);

        environment.execute(&quot;My test job&quot;);

Sources:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program
  2. https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#protobuf-via-kryo

答案2

得分: 0

确实似乎在这个问题上找到文档有些棘手。我会假设您使用的是 Flink 版本大于等于 1.9。在这种情况下,以下代码应该有效:

private static class PojoKafkaSerializationSchema implements KafkaSerializationSchema<YourPojo> {
	@Override
	public void open(SerializationSchema.InitializationContext context) throws Exception {}

	@Override
	public ProducerRecord<byte[], byte[]> serialize(YourPojo element,@Nullable Long timestamp) {
		// 在此处对您的 POJO 进行序列化,并返回 Kafka 的 ProducerRecord
		return null;
	}
}

// 其他地方:
PojoKafkaSerializationSchema schema = new PojoKafkaSerializationSchema();
FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
	"test-topic",
	schema,
	properties,
	FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
);

此代码主要受到此测试用例的启发,但我没有时间实际运行它。

英文:

It indeed seems tricky to find documentation in this matter. I'll assume you use Flink >= 1.9. In that case, the following should work:

private static class PojoKafkaSerializationSchema implements KafkaSerializationSchema&lt;YourPojo&gt; {
	@Override
	public void open(SerializationSchema.InitializationContext context) throws Exception {}

	@Override
	public ProducerRecord&lt;byte[], byte[]&gt; serialize(YourPojo element,@Nullable Long timestamp) {
		// serialize your POJO here and return a Kafka `ProducerRecord`
		return null;
	}
}

// Elsewhere: 
PojoKafkaSerializationSchema schema = new PojoKafkaSerializationSchema();
FlinkKafkaProducer&lt;Integer&gt; kafkaProducer = new FlinkKafkaProducer&lt;&gt;(
	&quot;test-topic&quot;,
	schema,
	properties,
	FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
);

This code is mostly inspired by this test case, but I didn't have time to actually run it.

huangapple
  • 本文由 发表于 2020年9月18日 15:31:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/63951160.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定