从Kafka反序列化Java对象时遇到错误。

huangapple go评论115阅读模式
英文:

getting error on desirializing java object from Kafka

问题

我开始学习Kafka,目前正在发送/接收序列化/反序列化的Java类。
我的问题是:在我的配置中漏掉了什么,所以我无法从Kafka进行对象反序列化。

以下是我的类:

public class Foo { 
    private String item;
    private int quantity;
    private Double price;

    public Foo(String item, int quantity, final double price) {
        this.item = item;
        this.quantity = quantity;
        this.price = price;
    }

    public String getItem() { return item; }
    public int getQuantity() { return quantity; }
    public Double getPrice() { return price; }

    public void setQuantity(int quantity) { this.quantity = quantity; }
    public void setPrice(double price) { this.price = price; }

    @Override
    public String toString() {
        return "item=" + item + ", quantity=" + quantity + ", price=" + price;
    }
}

我的主类中的属性:

producerPropsObject.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationProducerSerializedObject);
producerPropsObject.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
producerPropsObject.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerPropsObject.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, FooSerializer.class.getName()); 
producerPropsObject.put("topic", AppConfigs.topicNameForSerializedObject);

consumerPropsObject.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfigs.applicationProducerSerializedObject);
consumerPropsObject.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
consumerPropsObject.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerPropsObject.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, FooDeserializer.class.getName());
consumerPropsObject.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
consumerPropsObject.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerPropsObject.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
consumerPropsObject.put("topic", AppConfigs.topicNameForSerializedObject);

以下是序列化器/反序列化器实现:

public class FooSerializer implements org.apache.kafka.common.serialization.Serializer {
    
    public void configure(Map map, boolean b) { }

    public byte[] serialize(String s, Object o) {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) { return new byte[0]; }
    }

    public void close() {  }
}

public class FooDeserializer implements org.apache.kafka.common.serialization.Deserializer {
    
    @Override
    public void close() { }

    @Override
    public Foo deserialize(String arg0, byte[] arg1) {
        JsonFactory factory = new JsonFactory();
        factory.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
        ObjectMapper mapper = new ObjectMapper(factory);
        Foo fooObj = null;
        try {
            fooObj = mapper.reader().forType(Foo.class).readValue(arg1);
        } catch (Exception e) { e.printStackTrace(); }
        return fooObj;
    }
}

最后,我尝试从主类中生成和消费Foo对象:

public void produceObjectToKafka(final Properties producerProps) { 
    final String[] ar = new String[]{"Matrix", "Naked Gun", "5th Element", "Die Hard", "Gone with a wind"};
    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerProps);
    final Foo j = new Foo(ar[getAnInt(4)], getAnInt(10), getAnDouble());
    producer.send(new ProducerRecord<>(producerProps.getProperty("topic"), j.getItem(), j.toString().getBytes()));
    producer.flush();   
    producer.close();
}

public void consumeFooFromKafka(final Properties consumerProps) {
    final Consumer<String, Foo> myConsumer = new KafkaConsumer<>(consumerProps);
    final Thread separateThread = new Thread(() -> {
        try {
            myConsumer.subscribe(Collections.singletonList(consumerProps.getProperty("topic")));
            while (continueToRunFlag) {
                final StringBuilder sb = new StringBuilder();
                final ConsumerRecords<String, Foo> consumerRecords = myConsumer.poll(Duration.ofMillis(10));
                if (consumerRecords.count() > 0) {
                    for (ConsumerRecord<String, Foo> cRec : consumerRecords) {
                        sb.append(cRec.key()).append("<<").append(cRec.value().getItem() + ",").append(cRec.value().getQuantity() + ",").append(cRec.value().getPrice()).append("|");
                    }
                }
                if (sb.length() > 0) { System.out.println(sb.toString()); }
            }
        }
        finally {
            myConsumer.close();
        }
    });
    separateThread.start();
}
英文:

I started to learn Kafka, and now,
I'm on sending/receiving serialized/desirialised java class.
My question is about: what have I missed in my config, so I can't deserialize the object from Kafka

here is my class:

> public class Foo {
> private String item;
> private int quantity;
> private Double price;
>
> public Foo(String item, int quantity, final double price) {
> this.item = item;
> this.quantity = quantity;
> this.price = price;
> }
>
> public String getItem() { return item; }
> public int getQuantity() { return quantity; }
> public Double getPrice() { return price; }
>
> public void setQuantity(int quantity) { this.quantity = quantity; }
> public void setPrice(double price) { this.price = price; }
>
> @Override
> public String toString() {
> return "item=" + item + ", quantity=" + quantity + ", price=" + price;
> }
> }

my Properties in main class:

> producerPropsObject.put(ProducerConfig.CLIENT_ID_CONFIG,
> AppConfigs.applicationProducerSerializedObject);
> producerPropsObject.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> AppConfigs.bootstrapServers);
> producerPropsObject.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> producerPropsObject.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> FooSerializer.class.getName());
> producerPropsObject.put("topic", AppConfigs.topicNameForSerializedObject);

> consumerPropsObject.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfigs.applicationProducerSerializedObject);
> consumerPropsObject.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
> consumerPropsObject.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
> consumerPropsObject.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,FooDeserializer.class.getName());
> consumerPropsObject.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
> consumerPropsObject.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
> consumerPropsObject.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> consumerPropsObject.put("topic", AppConfigs.topicNameForSerializedObject);

following are serializer/deserializer implementations:

> public class FooSerializer implements org.apache.kafka.common.serialization.Serializer {
>
> public void configure(Map map, boolean b) { }
>
> public byte[] serialize(String s, Object o) {
> try {
> ByteArrayOutputStream baos = new ByteArrayOutputStream();
> ObjectOutputStream oos = new ObjectOutputStream(baos);
> oos.writeObject(o);
> oos.close();
> byte[] b = baos.toByteArray();
> return b;
> } catch (IOException e) { return new byte[0]; }
> }
>
> public void close() { }
> }
>
> public class FooDeserializer implements org.apache.kafka.common.serialization.Deserializer {
>
> @Override
> public void close() { }
>
> @Override
> public Foo deserialize(String arg0, byte[] arg1) {
>
> //Option #1:
> //ObjectMapper mapper = new ObjectMapper();
> //Option #2:
> JsonFactory factory = new JsonFactory();
> factory.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
> ObjectMapper mapper = new ObjectMapper(factory);
> Foo fooObj = null;
> try {
> //Option #1:
> //fooObj = mapper.readValue(arg1, Foo.class); // BREAKS HERE!!!
> //Option #2:
> fooObj = mapper.reader().forType(Foo.class).readValue(arg1); // BREAKS HERE!!!
>
> }
> catch (Exception e) { e.printStackTrace(); }
>
> return fooObj;
> }
>
> }

and finally the way I'm trying to produce and consume my Foo from main:

seems, like it works fine, cause I see in kafka-topic my Key && Value later on

> public void produceObjectToKafka(final Properties producerProps) {
> final String[] ar = new String[]{"Matrix", "Naked Gun", "5th Element", "Die Hard", "Gone with a wind"};
> KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerProps);
> final Foo j = new Foo(ar[getAnInt(4)], getAnInt(10), getAnDouble());
> producer.send(new ProducerRecord<>(producerProps.getProperty("topic"), j.getItem(), j.toString().getBytes()));
> producer.flush();
> producer.close();
> }

however, while my Consumer is catching the output:

> public void consumeFooFromKafka(final Properties consumerProps) {
> final Consumer<String, Foo> myConsumer = new KafkaConsumer<>(consumerProps);
> final Thread separateThread = new Thread(() -> {
> try {
> myConsumer.subscribe(Collections.singletonList(consumerProps.getProperty("topic")));
> while (continueToRunFlag) {
> final StringBuilder sb = new StringBuilder();
> final ConsumerRecords<String, Foo> consumerRecords = myConsumer.poll(Duration.ofMillis(10));
> if (consumerRecords.count() > 0) {
> for (ConsumerRecord<String, Foo> cRec : consumerRecords) {
> sb.append( cRec.key() ).append("<<").append(cRec.value().getItem() + ",").append(cRec.value().getQuantity() + ",").append(cRec.value().getPrice()).append("|");
> }
> }
> if (sb.length() > 0) { System.out.println(sb.toString()); }
> }
> }
> finally {
> myConsumer.close();
> }
> });
> separateThread.start();
> }

=======================================
so, actually by running "consumeFooFromKafka" , when it trigger "FooDeserializer" ...... there, I always have same error(regardless of Option #1, or Option #2):

> exception:
> Method threw 'com.fasterxml.jackson.core.JsonParseException' exception.
> detailedMessage:
> Unexpected character ('¬' (code 172)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or
> 'false')

will be very appresiated for help.......
Thank you in advance,
Steve

答案1

得分: 0

我不知道你为什么在使用字节数组输出流,并且尝试在反序列化器中读取 JSON,但这解释了错误。你甚至可以通过直接调用序列化/反序列化方法,在完全不使用 Kafka 的情况下进行测试。

在提供的链接中,序列化器使用了 objectMapper.writeValueAsString,它返回的是 JSON 文本,而不是特定于 Java 的输出流。如果你想在不同的编程语言之间消费和产生数据(这在大多数公司中经常发生),你会希望避免使用这种特定的序列化格式。

注意:Confluent 为 Kafka 提供了 Avro、Protobuf 和 JSON 序列化器,因此如果你想使用这些格式之一,就不需要自己编写序列化器。

英文:

I don't know why you're using a bytearray outputstream, but trying to read JSON in the deserializer, but that explains the error. You could even test that without using Kafka at all by invoking the serialize/deserialize methods directly

In the link provided, the serializer uses objectMapper.writeValueAsString, which returns JSON text, and not the Java specific outputstream. If you wanted to consume and produce data between different programming languages (as is often the case in most companies), you'd want to avoid such specific serialization formats

Note: Confluent provides Avro, Protobuf, and JSON serializers for Kafka, so you shouldn't need to write your own if you want to use one of those formats

答案2

得分: 0

如果您想从 JSON 反序列化那么您需要将其序列化为 JSON还需要在您的序列化器中使用 Jackson然后一切都应该没问题

```java
public class FooSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) { }

    public byte[] serialize(String s, Object o) {
        try {
            ObjectMapper om = new ObjectMapper(); // 来自 Jackson 的 ObjectMapper
            byte[] b = om.writeValueAsString(o).getBytes();
            return b;

        } catch (IOException e) { return new byte[0]; }
    }

    public void close() {  }
}
英文:

If you want to deserialize from json, than u need to serialize it to json, use jackson in ur serializer also, and everything should be fine

public class FooSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) { }

    public byte[] serialize(String s, Object o) {
        try {
            ObjectMapper om = new ObjectMapper();//objectmapper from jackson
            byte[] b = om.writeValueAsString(o).getBytes();
            return b;

        } catch (IOException e) { return new byte[0]; }
    }

    public void close() {  }
}



</details>



huangapple
  • 本文由 发表于 2020年10月3日 10:47:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/64180238.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定