Failed to convert from JSON;Unexpected character ('f' (code 102)): Expected space separating root-level values at (String)"5f19a7e99933db43cb23e83d"

huangapple go评论117阅读模式
英文:

Failed to convert from JSON;Unexpected character ('f' (code 102)): Expected space separating root-level values at (String)"5f19a7e99933db43cb23e83d"

问题

@KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT)
@SendTo
public Product GetProduct(String id) {
    return _productRepository.findByid(id);
}
@Configuration
public class KafkaConfiguration {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(Object.class));
    }

    @Bean
    public ReplyingKafkaTemplate<String, Object, Object> replyer(ProducerFactory<String, Object> pf,
                                                                 ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory) {
        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, Object> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, Object, Object> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, Object> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory) {
        ConcurrentMessageListenerContainer<String, Object> container =
                containerFactory.createContainer(ProductTopicConstants.LISTNER_CONTAINER);
        container.getContainerProperties().setGroupId(ProductTopicConstants.LISTNER_CONTAINER);
        containerFactory.setMessageConverter(new JsonMessageConverter());
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
        return new KafkaTemplate<>(pf);
    }
}
@Override
public ProductViewModel GetProduct(String id) throws InterruptedException, ExecutionException, TimeoutException {
    RequestReplyFuture<String, Object, Object> future =
            this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.GET_PRODUCT, 0, null, id));
    LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
    var products = (Product) future.get(10, TimeUnit.SECONDS).value();
    var mappedProducts = mapper.convertValue(products, new TypeReference<Product>() {
    });
    return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription());
}
spring:
  profiles:
    active: dev
  kafka:
    bootstrapAddress: localhost:9092
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      properties:
        spring:
          json:
            trusted:
              packages: '*'
@Bean
public static NewTopic GetProduct() {
    return new NewTopic(ProductTopicConstants.GET_PRODUCT, 1, (short) 1);
}

@Bean
public NewTopic GetProducts() {
    return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
}

@Bean
public NewTopic GetProductsContainer() {
    return TopicBuilder.name(ProductTopicConstants.LISTNER_CONTAINER).partitions(1).replicas(1).build();
}
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): Expected space separating root-level values
 at [Source: (String)"5f19a7e99933db43cb23e83d"; line: 1, column: 3]
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    ...
2020-07-26 21:52:47.026  INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
2020-07-26 21:52:47.029 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
...

Link to Consumer console screenshot

英文:
   @KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT)
@SendTo
public Product GetProduct(String id) {
return _productRepository.findByid(id);
}

Kafka Configuration

    @Configuration
public class KafkaConfiguration {
@Bean
public Map&lt;String, Object&gt; consumerConfigs() {
Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory&lt;String, Object&gt; consumerFactory() {
return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer&lt;&gt;(Object.class));
}
@Bean
public ReplyingKafkaTemplate&lt;String, Object, Object&gt; replyer(ProducerFactory&lt;String, Object&gt; pf,
ConcurrentKafkaListenerContainerFactory&lt;String, Object&gt; containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer&lt;String, Object&gt; container = replyContainer(containerFactory);
ReplyingKafkaTemplate&lt;String, Object, Object&gt; replyer = new ReplyingKafkaTemplate&lt;&gt;(pf, container);
return replyer;
}
@Bean
public ConcurrentMessageListenerContainer&lt;String, Object&gt; replyContainer(
ConcurrentKafkaListenerContainerFactory&lt;String, Object&gt; containerFactory) {
ConcurrentMessageListenerContainer&lt;String, Object&gt; container =
containerFactory.createContainer(ProductTopicConstants.LISTNER_CONTAINER);
container.getContainerProperties().setGroupId(ProductTopicConstants.LISTNER_CONTAINER);
containerFactory.setMessageConverter(new JsonMessageConverter());
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
}
@Bean
public KafkaTemplate&lt;String, Object&gt; kafkaTemplate(ProducerFactory&lt;String, Object&gt; pf) {
return new KafkaTemplate&lt;&gt;(pf);
}
}

Kafka Producer

   @Override
public ProductViewModel GetProduct(String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture&lt;String, Object, Object&gt; future =
this._replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(ProductTopicConstants.GET_PRODUCT, 0, null,id));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
var products = (Product)future.get(10, TimeUnit.SECONDS).value();
var mappedProducts = mapper.convertValue(products, new TypeReference&lt;Product&gt;() { });
return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription());
}

Kafka Configuration

spring:
profiles:
active: dev
kafka:
bootstrapAddress: localhost:9092
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
properties:
spring:
json:
trusted:
packages: &#39;*&#39;

Topic

@Bean
public static NewTopic GetProduct() {
return new NewTopic(ProductTopicConstants.GET_PRODUCT, 1, (short) 1);
}
@Bean
public NewTopic GetProducts() {
return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
}
@Bean
public NewTopic GetProductsContainer() {
return TopicBuilder.name(ProductTopicConstants.LISTNER_CONTAINER).partitions(1).replicas(1).build();
}

Error

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 11 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:117) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:123) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:309) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:77) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1854) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1836) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1779) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 9 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:679) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1703) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1346) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:773) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4620) ~[jackson-databind-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4469) ~[jackson-databind-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3434) ~[jackson-databind-2.11.0.jar:2.11.0]
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:114) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 16 common frames omitted
2020-07-26 21:52:47.026  INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
2020-07-26 21:52:47.029 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 11 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:117) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:123) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:309) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:77) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1854) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1836) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1779) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 9 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:679) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1703) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1346) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:773) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4620) ~[jackson-databind-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4469) ~[jackson-databind-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3434) ~[jackson-databind-2.11.0.jar:2.11.0]
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:114) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 16 common frames omitted
2020-07-26 21:52:47.534  INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
2020-07-26 21:52:47.537 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 11 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:117) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:123) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:309) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:77) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1854) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1836) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1779) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 9 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:679) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1703) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1346) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:773) ~[jackson-core-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4620) ~[jackson-databind-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4469) ~[jackson-databind-2.11.0.jar:2.11.0]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3434) ~[jackson-databind-2.11.0.jar:2.11.0]
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:114) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 16 common frames omitted
2020-07-26 21:52:48.049  INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
2020-07-26 21:52:48.052 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

Output in Consumer console

Failed to convert from JSON;Unexpected character ('f' (code 102)): Expected space separating root-level values at (String)"5f19a7e99933db43cb23e83d"

All the combination I have tried but none of them works

答案1

得分: 1

以下是翻译好的内容:

你需要展示消费者端的配置;你正在进行双重 JSON 转换,应用了 JsonMessageConverter,这是不必要的,因为你已经在 JsonDeserializer 中将 JSON 转换为了字符串。你可以将反序列化器替换为简单的 StringDeserializerByteArrayDeserializer,以便转换器有内容可以进行转换。

编辑

因此,问题在于你的 ProductListener 有一个 bean:

@Bean
public RecordMessageConverter converter() {
    return new StringJsonMessageConverter();
}

而 Spring Boot 自动将其配置到了监听器容器工厂中,而且你还有一个 JsonDeserializer,所以,正如我所说,你进行了双重反序列化。我通过将该消息转换器 bean 添加到你简化的应用程序中来复现了这个问题。

你可以为该监听器使用不同的工厂,或者在监听器本身上覆盖反序列化器...

@KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT,
		clientIdPrefix = "getProd", 
		properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
			+ "=org.apache.kafka.common.serialization.StringDeserializer")
英文:

You need to show the consumer side configuration; you are doing double JSON conversion JsonMessageConverter is being applied, which is unnecessary since you have already converted the JSON to a String in the JsonDeserializer. You either need to replace the deserializer with a simple StringDeserializer or ByteArrayDeserializer so the converter has something to convert.

EDIT

So, the problem is that your ProductListener has a bean

@Bean
public RecordMessageConverter converter() {
    return new StringJsonMessageConverter();
}

and Boot is auto-configuring it into the listener container factory and you have a JsonDeserializer so, as I said, you are doing double deserialization. I was able to reproduce it by adding that message converter bean into your stripped down app.

You can either use a different factory for that listener, or override the deserializer on the listener itself...

@KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT,
		clientIdPrefix = &quot;getProd&quot;, 
		properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
			+ &quot;=org.apache.kafka.common.serialization.StringDeserializer&quot;)

答案2

得分: 0

尝试将JsonDeserializer更改为StringSerializer。

对于某些自定义对象,您应该配置您的消费者工厂以了解其类型:

@Bean
public ConsumerFactory<String, MyClass> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(MyClass.class));
}
英文:

Try changing JsonDeserializer to StringSerializer.

For some custom objects, you should config your consumer factory to understand its type:

@Bean
public ConsumerFactory&lt;String, MyClass&gt; consumerFactory() {
return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer&lt;&gt;(MyClass.class));
}

huangapple
  • 本文由 发表于 2020年7月26日 19:59:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/63099836.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定