Failed to convert from JSON;Unexpected character ('f' (code 102)): Expected space separating root-level values at (String)"5f19a7e99933db43cb23e83d"

huangapple go评论144阅读模式
英文:

Failed to convert from JSON;Unexpected character ('f' (code 102)): Expected space separating root-level values at (String)"5f19a7e99933db43cb23e83d"

问题

  1. @KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT)
  2. @SendTo
  3. public Product GetProduct(String id) {
  4. return _productRepository.findByid(id);
  5. }
  1. @Configuration
  2. public class KafkaConfiguration {
  3. @Bean
  4. public Map<String, Object> consumerConfigs() {
  5. Map<String, Object> props = new HashMap<>();
  6. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  7. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  8. return props;
  9. }
  10. @Bean
  11. public ConsumerFactory<String, Object> consumerFactory() {
  12. return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
  13. new StringDeserializer(),
  14. new JsonDeserializer<>(Object.class));
  15. }
  16. @Bean
  17. public ReplyingKafkaTemplate<String, Object, Object> replyer(ProducerFactory<String, Object> pf,
  18. ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory) {
  19. containerFactory.setReplyTemplate(kafkaTemplate(pf));
  20. ConcurrentMessageListenerContainer<String, Object> container = replyContainer(containerFactory);
  21. ReplyingKafkaTemplate<String, Object, Object> replyer = new ReplyingKafkaTemplate<>(pf, container);
  22. return replyer;
  23. }
  24. @Bean
  25. public ConcurrentMessageListenerContainer<String, Object> replyContainer(
  26. ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory) {
  27. ConcurrentMessageListenerContainer<String, Object> container =
  28. containerFactory.createContainer(ProductTopicConstants.LISTNER_CONTAINER);
  29. container.getContainerProperties().setGroupId(ProductTopicConstants.LISTNER_CONTAINER);
  30. containerFactory.setMessageConverter(new JsonMessageConverter());
  31. container.setBatchErrorHandler(new BatchLoggingErrorHandler());
  32. return container;
  33. }
  34. @Bean
  35. public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
  36. return new KafkaTemplate<>(pf);
  37. }
  38. }
  1. @Override
  2. public ProductViewModel GetProduct(String id) throws InterruptedException, ExecutionException, TimeoutException {
  3. RequestReplyFuture<String, Object, Object> future =
  4. this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.GET_PRODUCT, 0, null, id));
  5. LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
  6. var products = (Product) future.get(10, TimeUnit.SECONDS).value();
  7. var mappedProducts = mapper.convertValue(products, new TypeReference<Product>() {
  8. });
  9. return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription());
  10. }
  1. spring:
  2. profiles:
  3. active: dev
  4. kafka:
  5. bootstrapAddress: localhost:9092
  6. producer:
  7. key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  8. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  9. consumer:
  10. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. auto-offset-reset: earliest
  13. properties:
  14. spring:
  15. json:
  16. trusted:
  17. packages: '*'
  1. @Bean
  2. public static NewTopic GetProduct() {
  3. return new NewTopic(ProductTopicConstants.GET_PRODUCT, 1, (short) 1);
  4. }
  5. @Bean
  6. public NewTopic GetProducts() {
  7. return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
  8. }
  9. @Bean
  10. public NewTopic GetProductsContainer() {
  11. return TopicBuilder.name(ProductTopicConstants.LISTNER_CONTAINER).partitions(1).replicas(1).build();
  12. }
  1. org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): Expected space separating root-level values
  2. at [Source: (String)"5f19a7e99933db43cb23e83d"; line: 1, column: 3]
  3. at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  4. ...
  1. 2020-07-26 21:52:47.026 INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
  2. 2020-07-26 21:52:47.029 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
  3. ...

Link to Consumer console screenshot

英文:
  1. @KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT)
  2. @SendTo
  3. public Product GetProduct(String id) {
  4. return _productRepository.findByid(id);
  5. }

Kafka Configuration

  1. @Configuration
  2. public class KafkaConfiguration {
  3. @Bean
  4. public Map&lt;String, Object&gt; consumerConfigs() {
  5. Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
  6. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  7. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  8. return props;
  9. }
  10. @Bean
  11. public ConsumerFactory&lt;String, Object&gt; consumerFactory() {
  12. return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs(),
  13. new StringDeserializer(),
  14. new JsonDeserializer&lt;&gt;(Object.class));
  15. }
  16. @Bean
  17. public ReplyingKafkaTemplate&lt;String, Object, Object&gt; replyer(ProducerFactory&lt;String, Object&gt; pf,
  18. ConcurrentKafkaListenerContainerFactory&lt;String, Object&gt; containerFactory) {
  19. containerFactory.setReplyTemplate(kafkaTemplate(pf));
  20. ConcurrentMessageListenerContainer&lt;String, Object&gt; container = replyContainer(containerFactory);
  21. ReplyingKafkaTemplate&lt;String, Object, Object&gt; replyer = new ReplyingKafkaTemplate&lt;&gt;(pf, container);
  22. return replyer;
  23. }
  24. @Bean
  25. public ConcurrentMessageListenerContainer&lt;String, Object&gt; replyContainer(
  26. ConcurrentKafkaListenerContainerFactory&lt;String, Object&gt; containerFactory) {
  27. ConcurrentMessageListenerContainer&lt;String, Object&gt; container =
  28. containerFactory.createContainer(ProductTopicConstants.LISTNER_CONTAINER);
  29. container.getContainerProperties().setGroupId(ProductTopicConstants.LISTNER_CONTAINER);
  30. containerFactory.setMessageConverter(new JsonMessageConverter());
  31. container.setBatchErrorHandler(new BatchLoggingErrorHandler());
  32. return container;
  33. }
  34. @Bean
  35. public KafkaTemplate&lt;String, Object&gt; kafkaTemplate(ProducerFactory&lt;String, Object&gt; pf) {
  36. return new KafkaTemplate&lt;&gt;(pf);
  37. }
  38. }

Kafka Producer

  1. @Override
  2. public ProductViewModel GetProduct(String id) throws InterruptedException, ExecutionException, TimeoutException {
  3. RequestReplyFuture&lt;String, Object, Object&gt; future =
  4. this._replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(ProductTopicConstants.GET_PRODUCT, 0, null,id));
  5. LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
  6. var products = (Product)future.get(10, TimeUnit.SECONDS).value();
  7. var mappedProducts = mapper.convertValue(products, new TypeReference&lt;Product&gt;() { });
  8. return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription());
  9. }

Kafka Configuration

  1. spring:
  2. profiles:
  3. active: dev
  4. kafka:
  5. bootstrapAddress: localhost:9092
  6. producer:
  7. key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  8. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  9. consumer:
  10. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. auto-offset-reset: earliest
  13. properties:
  14. spring:
  15. json:
  16. trusted:
  17. packages: &#39;*&#39;

Topic

  1. @Bean
  2. public static NewTopic GetProduct() {
  3. return new NewTopic(ProductTopicConstants.GET_PRODUCT, 1, (short) 1);
  4. }
  5. @Bean
  6. public NewTopic GetProducts() {
  7. return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
  8. }
  9. @Bean
  10. public NewTopic GetProductsContainer() {
  11. return TopicBuilder.name(ProductTopicConstants.LISTNER_CONTAINER).partitions(1).replicas(1).build();
  12. }

Error

  1. org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  2. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  3. at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  4. at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  5. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  6. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  7. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  8. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  9. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  10. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  11. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  12. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
  13. at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
  14. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
  15. at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
  16. Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  17. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  18. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  19. ... 11 common frames omitted
  20. Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  21. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  22. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:117) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  23. at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:123) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  24. at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:309) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  25. at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:77) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  26. at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  27. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1854) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  28. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1836) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  29. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1779) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  30. ... 9 common frames omitted
  31. Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  32. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  33. at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851) ~[jackson-core-2.11.0.jar:2.11.0]
  34. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707) ~[jackson-core-2.11.0.jar:2.11.0]
  35. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632) ~[jackson-core-2.11.0.jar:2.11.0]
  36. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:679) ~[jackson-core-2.11.0.jar:2.11.0]
  37. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1703) ~[jackson-core-2.11.0.jar:2.11.0]
  38. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1346) ~[jackson-core-2.11.0.jar:2.11.0]
  39. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:773) ~[jackson-core-2.11.0.jar:2.11.0]
  40. at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4620) ~[jackson-databind-2.11.0.jar:2.11.0]
  41. at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4469) ~[jackson-databind-2.11.0.jar:2.11.0]
  42. at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3434) ~[jackson-databind-2.11.0.jar:2.11.0]
  43. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:114) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  44. ... 16 common frames omitted
  45. 2020-07-26 21:52:47.026 INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
  46. 2020-07-26 21:52:47.029 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
  47. org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  48. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  49. at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  50. at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  51. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  52. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  53. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  54. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  55. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  56. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  57. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  58. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
  59. at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
  60. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
  61. at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
  62. Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  63. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  64. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  65. ... 11 common frames omitted
  66. Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  67. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  68. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:117) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  69. at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:123) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  70. at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:309) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  71. at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:77) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  72. at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  73. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1854) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  74. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1836) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  75. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1779) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  76. ... 9 common frames omitted
  77. Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  78. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  79. at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851) ~[jackson-core-2.11.0.jar:2.11.0]
  80. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707) ~[jackson-core-2.11.0.jar:2.11.0]
  81. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632) ~[jackson-core-2.11.0.jar:2.11.0]
  82. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:679) ~[jackson-core-2.11.0.jar:2.11.0]
  83. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1703) ~[jackson-core-2.11.0.jar:2.11.0]
  84. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1346) ~[jackson-core-2.11.0.jar:2.11.0]
  85. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:773) ~[jackson-core-2.11.0.jar:2.11.0]
  86. at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4620) ~[jackson-databind-2.11.0.jar:2.11.0]
  87. at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4469) ~[jackson-databind-2.11.0.jar:2.11.0]
  88. at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3434) ~[jackson-databind-2.11.0.jar:2.11.0]
  89. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:114) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  90. ... 16 common frames omitted
  91. 2020-07-26 21:52:47.534 INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
  92. 2020-07-26 21:52:47.537 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
  93. org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  94. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  95. at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  96. at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  97. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  98. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  99. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  100. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  101. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  102. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  103. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  104. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
  105. at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
  106. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
  107. at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
  108. Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  109. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  110. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  111. ... 11 common frames omitted
  112. Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  113. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  114. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:117) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  115. at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:123) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  116. at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:309) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  117. at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:77) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  118. at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  119. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1854) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  120. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1836) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  121. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1779) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  122. ... 9 common frames omitted
  123. Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (&#39;f&#39; (code 102)): Expected space separating root-level values
  124. at [Source: (String)&quot;5f19a7e99933db43cb23e83d&quot;; line: 1, column: 3]
  125. at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851) ~[jackson-core-2.11.0.jar:2.11.0]
  126. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707) ~[jackson-core-2.11.0.jar:2.11.0]
  127. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632) ~[jackson-core-2.11.0.jar:2.11.0]
  128. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:679) ~[jackson-core-2.11.0.jar:2.11.0]
  129. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1703) ~[jackson-core-2.11.0.jar:2.11.0]
  130. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1346) ~[jackson-core-2.11.0.jar:2.11.0]
  131. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:773) ~[jackson-core-2.11.0.jar:2.11.0]
  132. at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4620) ~[jackson-databind-2.11.0.jar:2.11.0]
  133. at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4469) ~[jackson-databind-2.11.0.jar:2.11.0]
  134. at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3434) ~[jackson-databind-2.11.0.jar:2.11.0]
  135. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:114) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  136. ... 16 common frames omitted
  137. 2020-07-26 21:52:48.049 INFO 3997 --- [etProduct-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-GetProduct-4, groupId=GetProduct] Seeking to offset 25 for partition GetProduct-0
  138. 2020-07-26 21:52:48.052 ERROR 3997 --- [etProduct-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

Output in Consumer console

Failed to convert from JSON;Unexpected character ('f' (code 102)): Expected space separating root-level values at (String)"5f19a7e99933db43cb23e83d"

All the combination I have tried but none of them works

答案1

得分: 1

以下是翻译好的内容:

你需要展示消费者端的配置;你正在进行双重 JSON 转换,应用了 JsonMessageConverter,这是不必要的,因为你已经在 JsonDeserializer 中将 JSON 转换为了字符串。你可以将反序列化器替换为简单的 StringDeserializerByteArrayDeserializer,以便转换器有内容可以进行转换。

编辑

因此,问题在于你的 ProductListener 有一个 bean:

  1. @Bean
  2. public RecordMessageConverter converter() {
  3. return new StringJsonMessageConverter();
  4. }

而 Spring Boot 自动将其配置到了监听器容器工厂中,而且你还有一个 JsonDeserializer,所以,正如我所说,你进行了双重反序列化。我通过将该消息转换器 bean 添加到你简化的应用程序中来复现了这个问题。

你可以为该监听器使用不同的工厂,或者在监听器本身上覆盖反序列化器...

  1. @KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT,
  2. clientIdPrefix = "getProd",
  3. properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
  4. + "=org.apache.kafka.common.serialization.StringDeserializer")
英文:

You need to show the consumer side configuration; you are doing double JSON conversion JsonMessageConverter is being applied, which is unnecessary since you have already converted the JSON to a String in the JsonDeserializer. You either need to replace the deserializer with a simple StringDeserializer or ByteArrayDeserializer so the converter has something to convert.

EDIT

So, the problem is that your ProductListener has a bean

  1. @Bean
  2. public RecordMessageConverter converter() {
  3. return new StringJsonMessageConverter();
  4. }

and Boot is auto-configuring it into the listener container factory and you have a JsonDeserializer so, as I said, you are doing double deserialization. I was able to reproduce it by adding that message converter bean into your stripped down app.

You can either use a different factory for that listener, or override the deserializer on the listener itself...

  1. @KafkaListener(id = ProductTopicConstants.GET_PRODUCT, topics = ProductTopicConstants.GET_PRODUCT,
  2. clientIdPrefix = &quot;getProd&quot;,
  3. properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
  4. + &quot;=org.apache.kafka.common.serialization.StringDeserializer&quot;)

答案2

得分: 0

尝试将JsonDeserializer更改为StringSerializer。

对于某些自定义对象,您应该配置您的消费者工厂以了解其类型:

  1. @Bean
  2. public ConsumerFactory<String, MyClass> consumerFactory() {
  3. return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
  4. new StringDeserializer(),
  5. new JsonDeserializer<>(MyClass.class));
  6. }
英文:

Try changing JsonDeserializer to StringSerializer.

For some custom objects, you should config your consumer factory to understand its type:

  1. @Bean
  2. public ConsumerFactory&lt;String, MyClass&gt; consumerFactory() {
  3. return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs(),
  4. new StringDeserializer(),
  5. new JsonDeserializer&lt;&gt;(MyClass.class));
  6. }

huangapple
  • 本文由 发表于 2020年7月26日 19:59:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/63099836.html
匿名

发表评论

匿名网友
#

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定